Browse Source

- switched to much better tcp timers (performance increase especially
with tesn of thousands of active tcp connections)

Andrei Pelinescu-Onciul 18 years ago
parent
commit
ccb7fda23b
5 changed files with 262 additions and 251 deletions
  1. 1 1
      Makefile.defs
  2. 3 0
      local_timer.h
  3. 4 0
      tcp_conn.h
  4. 202 74
      tcp_main.c
  5. 52 176
      tcp_read.c

+ 1 - 1
Makefile.defs

@@ -77,7 +77,7 @@ MAIN_NAME=ser
 VERSION = 2
 PATCHLEVEL = 1
 SUBLEVEL =  0
-EXTRAVERSION = -dev12
+EXTRAVERSION = -dev13
 
 SER_VER = $(shell expr $(VERSION) \* 1000000 + $(PATCHLEVEL) \* 1000 + \
 			$(SUBLEVEL) )

+ 3 - 0
local_timer.h

@@ -37,6 +37,9 @@
 #ifndef _local_timer_h
 #define _local_timer_h
 
+#include "timer_ticks.h"
+#include "timer_funcs.h"
+
 
 struct local_timer {
 	/* private timer information */

+ 4 - 0
tcp_conn.h

@@ -33,6 +33,7 @@
  *  2006-10-13  added tcp_req_states for STUN (vlada)
  *  2007-07-26  improved tcp connection hash function; increased aliases
  *               hash size (andrei)
+ *  2007-11-26  switched to local_timer (andrei)
  */
 
 
@@ -44,6 +45,7 @@
 #include "locking.h"
 #include "atomic_ops.h"
 #include "timer_ticks.h"
+#include "timer.h"
 
 /* maximum number of port aliases x search wildcard possibilities */
 #define TCP_CON_MAX_ALIASES (4*3) 
@@ -64,6 +66,7 @@
 /* tcp connection flags */
 #define F_CONN_NON_BLOCKING 1
 #define F_CONN_REMOVED      2 /* no longer  in "main" listen fd list */
+#define F_CONN_READER       4 /* handled by a tcp reader */
 
 
 enum tcp_req_errors {	TCP_REQ_INIT, TCP_REQ_OK, TCP_READ_ERROR,
@@ -132,6 +135,7 @@ struct tcp_connection{
 	int flags; /* connection related flags */
 	enum tcp_conn_states state; /* connection state */
 	void* extra_data; /* extra data associated to the connection, 0 for tcp*/
+	struct timer_ln timer;
 	unsigned int timeout;/* connection timeout, after this it will be removed*/
 	unsigned id_hash; /* hash index in the id_hash */
 	struct tcp_connection* id_next; /* next, prev in id hash table */

+ 202 - 74
tcp_main.c

@@ -83,6 +83,7 @@
  *               connect/ new sockets (andrei)
  *  2007-11-22  always add the connection & clear the coresponding flags before
  *               io_watch_add-ing its fd - it's safer this way (andrei)
+ *  2007-11-26  improved tcp timers: switched to local_timer (andrei)
  */
 
 
@@ -130,6 +131,7 @@
 #include "tcp_init.h"
 #include "tsend.h"
 #include "timer_ticks.h"
+#include "local_timer.h"
 #ifdef CORE_TLS
 #include "tls/tls_server.h"
 #define tls_loaded() 1
@@ -167,8 +169,9 @@
 
 /* maximum accepted lifetime (maximum possible is  ~ MAXINT/2) */
 #define MAX_TCP_CON_LIFETIME	((1U<<(sizeof(ticks_t)*8-1))-1)
-/* minimum interval tcpconn_timeout() is allowed to run, in ticks */
-#define TCPCONN_TIMEOUT_MIN_RUN S_TO_TICKS(1)  /* once per s */
+/* minimum interval local_timer_run() is allowed to run, in ticks */
+#define TCPCONN_TIMEOUT_MIN_RUN 1  /* once per tick */
+#define TCPCONN_WAIT_TIMEOUT 1 /* 1 tick */
 
 enum fd_types { F_NONE, F_SOCKINFO /* a tcp_listen fd */,
 				F_TCPCONN, F_TCPCHILD, F_PROC };
@@ -213,8 +216,11 @@ static int tcp_proto_no=-1; /* tcp protocol number as returned by
 
 static io_wait_h io_h;
 
+static struct local_timer tcp_main_ltimer;
 
 
+static ticks_t tcpconn_main_timeout(ticks_t , struct timer_ln* , void* );
+
 inline static int _tcpconn_add_alias_unsafe(struct tcp_connection* c, int port,
 										struct ip_addr* l_ip, int l_port,
 										int flags);
@@ -531,6 +537,7 @@ struct tcp_connection* tcpconn_new(int sock, union sockaddr_union* su,
 	c->rcv.src_su=*su;
 	
 	atomic_set(&c->refcnt, 0);
+	timer_init(&c->timer, tcpconn_main_timeout, c, 0);
 	su2ip_addr(&c->rcv.src_ip, su);
 	c->rcv.src_port=su_getport(su);
 	c->rcv.bind_address=ba;
@@ -697,8 +704,7 @@ inline static struct tcp_connection*  tcpconn_add(struct tcp_connection *c)
 }
 
 
-/* unsafe tcpconn_rm version (nolocks) */
-void _tcpconn_rm(struct tcp_connection* c)
+static inline void _tcpconn_detach(struct tcp_connection *c)
 {
 	int r;
 	tcpconn_listrm(tcpconn_id_hash[c->id_hash], c, id_next, id_prev);
@@ -706,15 +712,30 @@ void _tcpconn_rm(struct tcp_connection* c)
 	for (r=0; r<c->aliases; r++)
 		tcpconn_listrm(tcpconn_aliases_hash[c->con_aliases[r].hash], 
 						&c->con_aliases[r], next, prev);
+}
+
+
+
+static inline void _tcpconn_free(struct tcp_connection* c)
+{
 	lock_destroy(&c->write_lock);
 #ifdef USE_TLS
-	if (c->type==PROTO_TLS) tls_tcpconn_clean(c);
+	if (unlikely(c->type==PROTO_TLS)) tls_tcpconn_clean(c);
 #endif
 	shm_free(c);
 }
 
 
 
+/* unsafe tcpconn_rm version (nolocks) */
+void _tcpconn_rm(struct tcp_connection* c)
+{
+	_tcpconn_detach(c);
+	_tcpconn_free(c);
+}
+
+
+
 void tcpconn_rm(struct tcp_connection* c)
 {
 	int r;
@@ -1207,9 +1228,17 @@ error:
 static void tcpconn_destroy(struct tcp_connection* tcpconn)
 {
 	int fd;
+	ticks_t t;
 
+	/* always try to remove the timer to protect against tcpconn_destroy
+	 *  being called several times for the same connection 
+	 *  (if the timer is already removed, nothing happens) */
+	if (likely(!(tcpconn->flags & F_CONN_READER)))
+		local_timer_del(&tcp_main_ltimer, &tcpconn->timer);
 	TCPCONN_LOCK; /*avoid races w/ tcp_send*/
-	if (atomic_dec_and_test(&tcpconn->refcnt)){ 
+	if (likely(atomic_dec_and_test(&tcpconn->refcnt))){ 
+		_tcpconn_detach(tcpconn);
+		TCPCONN_UNLOCK;
 		DBG("tcpconn_destroy: destroying connection %p, flags %04x\n",
 				tcpconn, tcpconn->flags);
 		fd=tcpconn->s;
@@ -1218,18 +1247,26 @@ static void tcpconn_destroy(struct tcp_connection* tcpconn)
 		if (tcpconn->type==PROTO_TLS)
 			tls_close(tcpconn, fd);
 #endif
-		_tcpconn_rm(tcpconn);
+		_tcpconn_free(tcpconn);
 		close(fd);
 		(*tcp_connections_no)--;
 	}else{
+		TCPCONN_UNLOCK;
 		/* force timeout */
-		tcpconn->timeout=get_ticks_raw();
+		t=get_ticks_raw();
+		tcpconn->timeout=t+TCPCONN_WAIT_TIMEOUT;
 		tcpconn->state=S_CONN_BAD;
+		if (!(tcpconn->flags & F_CONN_READER)){
+			/* re-activate the timer only if the connection is handled
+			 * by tcp_main (and not by a tcp reader)*/
+			tcpconn->timer.f=tcpconn_main_timeout;
+			timer_reinit(&tcpconn->timer);
+			local_timer_add(&tcp_main_ltimer, &tcpconn->timer, 
+									TCPCONN_WAIT_TIMEOUT, t);
+		}
 		DBG("tcpconn_destroy: delaying (%p, flags %04x) ...\n",
 				tcpconn, tcpconn->flags);
-		
 	}
-	TCPCONN_UNLOCK;
 }
 
 
@@ -1308,7 +1345,7 @@ inline static int send_fd_queue_add(	struct tcp_send_fd_q* q,
 		if (new_size< MAX_SEND_FD_QUEUE_SIZE/2){
 			new_size*=2;
 		}else new_size=MAX_SEND_FD_QUEUE_SIZE;
-		if (q->crt>=&q->data[new_size]){
+		if (unlikely(q->crt>=&q->data[new_size])){
 			LOG(L_ERR, "ERROR: send_fd_queue_add: queue full: %ld/%ld\n",
 					(long)(q->crt-&q->data[0]-1), new_size);
 			goto error;
@@ -1316,7 +1353,7 @@ inline static int send_fd_queue_add(	struct tcp_send_fd_q* q,
 		LOG(L_CRIT, "INFO: send_fd_queue: queue full: %ld, extending to %ld\n",
 				(long)(q->end-&q->data[0]), new_size);
 		tmp=pkg_realloc(q->data, new_size*sizeof(struct send_fd_info));
-		if (tmp==0){
+		if (unlikely(tmp==0)){
 			LOG(L_ERR, "ERROR: send_fd_queue_add: out of memory\n");
 			goto error;
 		}
@@ -1342,8 +1379,8 @@ inline static void send_fd_queue_run(struct tcp_send_fd_q* q)
 	struct send_fd_info* t;
 	
 	for (p=t=&q->data[0]; p<q->crt; p++){
-		if (send_fd(p->unix_sock, &(p->tcp_conn),
-					sizeof(struct tcp_connection*), p->tcp_conn->s)<=0){
+		if (unlikely(send_fd(p->unix_sock, &(p->tcp_conn),
+					sizeof(struct tcp_connection*), p->tcp_conn->s)<=0)){
 			if ( ((errno==EAGAIN)||(errno==EWOULDBLOCK)) && 
 							((s_ticks_t)(p->expire-get_ticks_raw())>0)){
 				/* leave in queue for a future try */
@@ -1384,8 +1421,9 @@ inline static int handle_tcp_child(struct tcp_child* tcp_c, int fd_i)
 	long response[2];
 	int cmd;
 	int bytes;
+	ticks_t t;
 	
-	if (tcp_c->unix_sock<=0){
+	if (unlikely(tcp_c->unix_sock<=0)){
 		/* (we can't have a fd==0, 0 is never closed )*/
 		LOG(L_CRIT, "BUG: handle_tcp_child: fd %d for %d "
 				"(pid %d, ser no %d)\n", tcp_c->unix_sock,
@@ -1395,7 +1433,7 @@ inline static int handle_tcp_child(struct tcp_child* tcp_c, int fd_i)
 	/* read until sizeof(response)
 	 * (this is a SOCK_STREAM so read is not atomic) */
 	bytes=recv_all(tcp_c->unix_sock, response, sizeof(response), MSG_DONTWAIT);
-	if (bytes<(int)sizeof(response)){
+	if (unlikely(bytes<(int)sizeof(response))){
 		if (bytes==0){
 			/* EOF -> bad, child has died */
 			DBG("DBG: handle_tcp_child: dead tcp child %d (pid %d, no %d)"
@@ -1432,7 +1470,7 @@ inline static int handle_tcp_child(struct tcp_child* tcp_c, int fd_i)
 					response[0], response[1], (int)(tcp_c-&tcp_children[0]));
 	cmd=response[1];
 	tcpconn=(struct tcp_connection*)response[0];
-	if (tcpconn==0){
+	if (unlikely(tcpconn==0)){
 		/* should never happen */
 		LOG(L_CRIT, "BUG: handle_tcp_child: null tcpconn pointer received"
 				 " from tcp child %d (pid %d): %lx, %lx\n",
@@ -1443,16 +1481,28 @@ inline static int handle_tcp_child(struct tcp_child* tcp_c, int fd_i)
 	switch(cmd){
 		case CONN_RELEASE:
 			tcp_c->busy--;
-			if (tcpconn->state==S_CONN_BAD){ 
+			if (unlikely(tcpconn->state==S_CONN_BAD)){ 
 				tcpconn_destroy(tcpconn);
 				break;
 			}
 			/* update the timeout*/
-			tcpconn->timeout=get_ticks_raw()+tcp_con_lifetime;
+			t=get_ticks_raw();
+			tcpconn->timeout=t+tcp_con_lifetime;
 			tcpconn_put(tcpconn);
+			/* re-activate the timer */
+			tcpconn->timer.f=tcpconn_main_timeout;
+			timer_reinit(&tcpconn->timer);
+			local_timer_add(&tcp_main_ltimer, &tcpconn->timer, 
+								tcp_con_lifetime, t);
 			/* must be after the de-ref*/
-			tcpconn->flags&=~F_CONN_REMOVED;
-			io_watch_add(&io_h, tcpconn->s, F_TCPCONN, tcpconn);
+			tcpconn->flags&=~(F_CONN_REMOVED|F_CONN_READER);
+			if (unlikely(
+					io_watch_add(&io_h, tcpconn->s, F_TCPCONN, tcpconn)<0)){
+				LOG(L_CRIT, "ERROR: tcp_main: handle_tcp_child: failed to add"
+						" new socket to the fd list\n");
+				tcpconn->flags|=F_CONN_REMOVED;
+				tcpconn_destroy(tcpconn); /* closes also the fd */
+			}
 			DBG("handle_tcp_child: CONN_RELEASE  %p refcnt= %d\n", 
 							tcpconn, atomic_get(&tcpconn->refcnt));
 			break;
@@ -1501,9 +1551,10 @@ inline static int handle_ser_child(struct process_table* p, int fd_i)
 	int bytes;
 	int ret;
 	int fd;
+	ticks_t t;
 	
 	ret=-1;
-	if (p->unix_sock<=0){
+	if (unlikely(p->unix_sock<=0)){
 		/* (we can't have a fd==0, 0 is never closed )*/
 		LOG(L_CRIT, "BUG: handle_ser_child: fd %d for %d "
 				"(pid %d)\n", p->unix_sock, (int)(p-&pt[0]), p->pid);
@@ -1514,7 +1565,7 @@ inline static int handle_ser_child(struct process_table* p, int fd_i)
 	 * (this is a SOCK_STREAM so read is not atomic) */
 	bytes=receive_fd(p->unix_sock, response, sizeof(response), &fd,
 						MSG_DONTWAIT);
-	if (bytes<(int)sizeof(response)){
+	if (unlikely(bytes<(int)sizeof(response))){
 		/* too few bytes read */
 		if (bytes==0){
 			/* EOF -> bad, child has died */
@@ -1551,7 +1602,7 @@ inline static int handle_ser_child(struct process_table* p, int fd_i)
 					response[0], response[1], fd, (int)(p-&pt[0]), p->pid);
 	cmd=response[1];
 	tcpconn=(struct tcp_connection*)response[0];
-	if (tcpconn==0){
+	if (unlikely(tcpconn==0)){
 		LOG(L_CRIT, "BUG: handle_ser_child: null tcpconn pointer received"
 				 " from child %d (pid %d): %lx, %lx\n",
 				 	(int)(p-&pt[0]), p->pid, response[0], response[1]) ;
@@ -1569,8 +1620,8 @@ inline static int handle_ser_child(struct process_table* p, int fd_i)
 			/* send the requested FD  */
 			/* WARNING: take care of setting refcnt properly to
 			 * avoid race condition */
-			if (send_fd(p->unix_sock, &tcpconn, sizeof(tcpconn),
-							tcpconn->s)<=0){
+			if (unlikely(send_fd(p->unix_sock, &tcpconn, sizeof(tcpconn),
+								tcpconn->s)<=0)){
 				LOG(L_ERR, "ERROR: handle_ser_child: send_fd failed\n");
 			}
 			break;
@@ -1578,7 +1629,7 @@ inline static int handle_ser_child(struct process_table* p, int fd_i)
 			/* update the fd in the requested tcpconn*/
 			/* WARNING: take care of setting refcnt properly to
 			 * avoid race condition */
-			if (fd==-1){
+			if (unlikely(fd==-1)){
 				LOG(L_CRIT, "BUG: handle_ser_child: CONN_NEW:"
 							" no fd received\n");
 				break;
@@ -1588,9 +1639,19 @@ inline static int handle_ser_child(struct process_table* p, int fd_i)
 			/* add tcpconn to the list*/
 			tcpconn_add(tcpconn);
 			/* update the timeout*/
-			tcpconn->timeout=get_ticks_raw()+tcp_con_lifetime;
+			t=get_ticks_raw();
+			tcpconn->timeout=t+tcp_con_lifetime;
+			/* activate the timer (already properly init. in tcpconn_new() */
+			local_timer_add(&tcp_main_ltimer, &tcpconn->timer, 
+								tcp_con_lifetime, t);
 			tcpconn->flags&=~F_CONN_REMOVED;
-			io_watch_add(&io_h, tcpconn->s, F_TCPCONN, tcpconn);
+			if (unlikely(
+					io_watch_add(&io_h, tcpconn->s, F_TCPCONN, tcpconn)<0)){
+				LOG(L_CRIT, "ERROR: tcp_main: handle_ser_child: failed to add"
+						" new socket to the fd list\n");
+				tcpconn->flags|=F_CONN_REMOVED;
+				tcpconn_destroy(tcpconn); /* closes also the fd */
+			}
 			break;
 		default:
 			LOG(L_CRIT, "BUG: handle_ser_child: unknown cmd %d\n", cmd);
@@ -1630,7 +1691,7 @@ inline static int send2child(struct tcp_connection* tcpconn)
 	
 	tcp_children[idx].busy++;
 	tcp_children[idx].n_reqs++;
-	if (min_busy){
+	if (unlikely(min_busy)){
 		DBG("WARNING: send2child: no free tcp receiver, "
 				" connection passed to the least busy one (%d)\n",
 				min_busy);
@@ -1649,8 +1710,8 @@ inline static int send2child(struct tcp_connection* tcpconn)
 		
 #ifdef SEND_FD_QUEUE
 	/* if queue full, try to queue the io */
-	if (send_fd(tcp_children[idx].unix_sock, &tcpconn, sizeof(tcpconn),
-			tcpconn->s)<=0){
+	if (unlikely(send_fd(tcp_children[idx].unix_sock, &tcpconn,
+							sizeof(tcpconn), tcpconn->s)<=0)){
 		if ((errno==EAGAIN)||(errno==EWOULDBLOCK)){
 			/* FIXME: remove after debugging */
 			 LOG(L_CRIT, "INFO: tcp child %d, socket %d: queue full,"
@@ -1668,8 +1729,8 @@ inline static int send2child(struct tcp_connection* tcpconn)
 		}
 	}
 #else
-	if (send_fd(tcp_children[idx].unix_sock, &tcpconn, sizeof(tcpconn),
-			tcpconn->s)<=0){
+	if (unlikely(send_fd(tcp_children[idx].unix_sock, &tcpconn,
+						sizeof(tcpconn), tcpconn->s)<=0)){
 		LOG(L_ERR, "ERROR: send2child: send_fd failed\n");
 		return -1;
 	}
@@ -1700,20 +1761,20 @@ static inline int handle_new_connect(struct socket_info* si)
 	/* got a connection on r */
 	su_len=sizeof(su);
 	new_sock=accept(si->socket, &(su.s), &su_len);
-	if (new_sock==-1){
+	if (unlikely(new_sock==-1)){
 		if ((errno==EAGAIN)||(errno==EWOULDBLOCK))
 			return 0;
 		LOG(L_ERR,  "WARNING: handle_new_connect: error while accepting"
 				" connection(%d): %s\n", errno, strerror(errno));
 		return -1;
 	}
-	if (*tcp_connections_no>=tcp_max_connections){
+	if (unlikely(*tcp_connections_no>=tcp_max_connections)){
 		LOG(L_ERR, "ERROR: maximum number of connections exceeded: %d/%d\n",
 					*tcp_connections_no, tcp_max_connections);
 		close(new_sock);
 		return 1; /* success, because the accept was succesfull */
 	}
-	if (init_sock_opt_accept(new_sock)<0){
+	if (unlikely(init_sock_opt_accept(new_sock)<0)){
 		LOG(L_ERR, "ERROR: handle_new_connect: init_sock_opt failed\n");
 		close(new_sock);
 		return 1; /* success, because the accept was succesfull */
@@ -1721,7 +1782,7 @@ static inline int handle_new_connect(struct socket_info* si)
 	(*tcp_connections_no)++;
 	
 	dst_su=&si->su;
-	if (si->flags & SI_IS_ANY){
+	if (unlikely(si->flags & SI_IS_ANY)){
 		/* INADDR_ANY => get local dst */
 		sock_name_len=sizeof(sock_name);
 		if (getsockname(new_sock, &sock_name.s, &sock_name_len)!=0){
@@ -1735,11 +1796,19 @@ static inline int handle_new_connect(struct socket_info* si)
 	}
 	/* add socket to list */
 	tcpconn=tcpconn_new(new_sock, &su, dst_su, si, si->proto, S_CONN_ACCEPT);
-	if (tcpconn){
+	if (likely(tcpconn)){
 #ifdef TCP_PASS_NEW_CONNECTION_ON_DATA
 		tcpconn_add(tcpconn);
+		/* activate the timer */
+		local_timer_add(&tcp_main_ltimer, &tcpconn->timer, 
+								tcp_con_lifetime, get_ticks_raw());
 		tcpconn->flags&=~F_CONN_REMOVED;
-		io_watch_add(&io_h, tcpconn->s, F_TCPCONN, tcpconn);
+		if (unlikely(io_watch_add(&io_h, tcpconn->s, F_TCPCONN, tcpconn)<0)){
+			LOG(L_CRIT, "ERROR: tcp_main: handle_new_connect: failed to add"
+						" new socket to the fd list\n");
+			tcpconn->flags|=F_CONN_REMOVED;
+			tcpconn_destroy(tcpconn); /* closes also the fd */
+		}
 #else
 		atomic_set(&tcpconn->refcnt, 1); /* safe, not yet available to the
 											outside world */
@@ -1747,7 +1816,8 @@ static inline int handle_new_connect(struct socket_info* si)
 		DBG("handle_new_connect: new connection: %p %d flags: %04x\n",
 			tcpconn, tcpconn->s, tcpconn->flags);
 		/* pass it to a child */
-		if(send2child(tcpconn)<0){
+		tcpconn->flags|=F_CONN_READER;
+		if(unlikely(send2child(tcpconn)<0)){
 			LOG(L_ERR,"ERROR: handle_new_connect: no children "
 					"available\n");
 			tcpconn_destroy(tcpconn);
@@ -1793,12 +1863,14 @@ inline static int handle_tcpconn_ev(struct tcp_connection* tcpconn, int fd_i)
 		return -1;
 	}
 #endif
-	/* pass it to child, so remove it from the io watch list */
+	/* pass it to child, so remove it from the io watch list  and the local
+	 *  timer */
 	DBG("handle_tcpconn_ev: data available on %p %d\n", tcpconn, tcpconn->s);
-	if (io_watch_del(&io_h, tcpconn->s, fd_i, 0)==-1) goto error;
-	tcpconn->flags|=F_CONN_REMOVED;
+	if (unlikely(io_watch_del(&io_h, tcpconn->s, fd_i, 0)==-1)) goto error;
+	tcpconn->flags|=F_CONN_REMOVED|F_CONN_READER;
+	local_timer_del(&tcp_main_ltimer, &tcpconn->timer);
 	tcpconn_ref(tcpconn); /* refcnt ++ */
-	if (send2child(tcpconn)<0){
+	if (unlikely(send2child(tcpconn)<0)){
 		LOG(L_ERR,"ERROR: handle_tcpconn_ev: no children available\n");
 		tcpconn_destroy(tcpconn);
 	}
@@ -1855,40 +1927,91 @@ error:
 
 
 
-/* very inefficient for now - FIXME
- * keep in sync with tcpconn_destroy, the "delete" part should be
+/* timer handler for tcpconnection handled by tcp_main */
+static ticks_t tcpconn_main_timeout(ticks_t t, struct timer_ln* tl, void* data)
+{
+	struct tcp_connection *c;
+	int fd;
+	
+	c=(struct tcp_connection*)data; 
+	/* or (struct tcp...*)(tl-offset(c->timer)) */
+	
+	if (TICKS_LT(t, c->timeout)){
+		/* timeout extended, exit */
+		return (ticks_t)(c->timeout - t);
+	}
+	if (likely(atomic_get(&c->refcnt)==0)){
+		TCPCONN_LOCK;
+			/* check again to avoid races with tcp_send() */
+			if (likely(atomic_get(&c->refcnt)==0)){
+				/* delete */
+				_tcpconn_detach(c);
+				TCPCONN_UNLOCK; /* unlock as soon as possible */
+				fd=c->s;
+				if (likely(fd>0)){
+					if (likely(!(c->flags & F_CONN_REMOVED))){
+						io_watch_del(&io_h, fd, -1, IO_FD_CLOSING);
+						c->flags|=F_CONN_REMOVED;
+					}
+#ifdef USE_TLS
+					if (unlikely(c->type==PROTO_TLS ))
+						tls_close(c, fd);
+#endif /* USE_TLS */
+					_tcpconn_free(c);
+					close(fd);
+				}
+				(*tcp_connections_no)--; /* modified only in tcp_main
+											 => no lock needed */
+				return 0; /* don't prolong the timer anymore */
+			}
+		TCPCONN_UNLOCK;
+	}
+	/* if we are here we can't delete the connection, it's still referenced
+	 *  => we just delay deleting it */
+	return TCPCONN_WAIT_TIMEOUT;
+}
+
+
+
+static inline void tcp_timer_run()
+{
+	ticks_t ticks;
+	static ticks_t prev_ticks=0;
+	
+	ticks=get_ticks_raw();
+	if (unlikely((ticks-prev_ticks)<TCPCONN_TIMEOUT_MIN_RUN)) return;
+	prev_ticks=ticks;
+	local_timer_run(&tcp_main_ltimer, ticks);
+}
+
+
+
+/* keep in sync with tcpconn_destroy, the "delete" part should be
  * the same except for io_watch_del..
- * Note: this function is called only from the tcp_main process with 1 
- * exception: on shutdown it's called also by the main ser process via
- * cleanup() => with the ser shutdown exception, it cannot execute in parallel
+ * Note: this function is called only on shutdown by the main ser process via
+ * cleanup(). However it's also safe to call it from the tcp_main process.
+ * => with the ser shutdown exception, it cannot execute in parallel
  * with tcpconn_add() or tcpconn_destroy()*/
-static inline void tcpconn_timeout(int force)
+static inline void tcpconn_destroy_all()
 {
-	static ticks_t prev_ticks=0;
 	struct tcp_connection *c, *next;
-	ticks_t ticks;
 	unsigned h;
 	int fd;
 	
 	
-	ticks=get_ticks_raw();
-	if (((ticks-prev_ticks)<TCPCONN_TIMEOUT_MIN_RUN) && !force) return;
-	prev_ticks=ticks;
-	TCPCONN_LOCK; /* fixme: we can lock only on delete IMO */
+	TCPCONN_LOCK; 
 	for(h=0; h<TCP_ID_HASH_SIZE; h++){
 		c=tcpconn_id_hash[h];
 		while(c){
 			next=c->id_next;
-			if (force ||((atomic_get(&c->refcnt)==0) &&
-						((s_ticks_t)(ticks-c->timeout)>=0))){
-				if (!force)
-					DBG("tcpconn_timeout: timeout for hash=%d - %p"
-							" (%d > %d)\n", h, c, ticks, c->timeout);
-				if (c->s>0 && is_tcp_main){
+				if (is_tcp_main){
 					/* we cannot close or remove the fd if we are not in the
 					 * tcp main proc.*/
+					if (!(c->flags & F_CONN_READER))
+						local_timer_del(&tcp_main_ltimer, &c->timer);
+					/* else still in some reader */
 					fd=c->s;
-					if (!(c->flags & F_CONN_REMOVED)){
+					if (fd>0 && !(c->flags & F_CONN_REMOVED)){
 						io_watch_del(&io_h, fd, -1, IO_FD_CLOSING);
 						c->flags|=F_CONN_REMOVED;
 					}
@@ -1896,7 +2019,7 @@ static inline void tcpconn_timeout(int force)
 					fd=-1;
 				}
 #ifdef USE_TLS
-				if (c->type==PROTO_TLS)
+				if (fd>0 && c->type==PROTO_TLS)
 					tls_close(c, fd);
 #endif
 				_tcpconn_rm(c);
@@ -1904,7 +2027,6 @@ static inline void tcpconn_timeout(int force)
 					close(fd);
 				}
 				(*tcp_connections_no)--;
-			}
 			c=next;
 		}
 	}
@@ -1994,7 +2116,7 @@ void tcp_main_loop()
 				io_wait_loop_poll(&io_h, TCP_MAIN_SELECT_TIMEOUT, 0); 
 				send_fd_queue_run(&send2child_q); /* then new io */
 				/* remove old connections */
-				tcpconn_timeout(0);
+				tcp_timer_run();
 			}
 			break;
 #ifdef HAVE_SELECT
@@ -2002,7 +2124,7 @@ void tcp_main_loop()
 			while(1){
 				io_wait_loop_select(&io_h, TCP_MAIN_SELECT_TIMEOUT, 0);
 				send_fd_queue_run(&send2child_q); /* then new io */
-				tcpconn_timeout(0);
+				tcp_timer_run();
 			}
 			break;
 #endif
@@ -2011,7 +2133,7 @@ void tcp_main_loop()
 			while(1){
 				io_wait_loop_sigio_rt(&io_h, TCP_MAIN_SELECT_TIMEOUT);
 				send_fd_queue_run(&send2child_q); /* then new io */
-				tcpconn_timeout(0);
+				tcp_timer_run();
 			}
 			break;
 #endif
@@ -2020,14 +2142,14 @@ void tcp_main_loop()
 			while(1){
 				io_wait_loop_epoll(&io_h, TCP_MAIN_SELECT_TIMEOUT, 0);
 				send_fd_queue_run(&send2child_q); /* then new io */
-				tcpconn_timeout(0);
+				tcp_timer_run();
 			}
 			break;
 		case POLL_EPOLL_ET:
 			while(1){
 				io_wait_loop_epoll(&io_h, TCP_MAIN_SELECT_TIMEOUT, 1);
 				send_fd_queue_run(&send2child_q); /* then new io */
-				tcpconn_timeout(0);
+				tcp_timer_run();
 			}
 			break;
 #endif
@@ -2036,7 +2158,7 @@ void tcp_main_loop()
 			while(1){
 				io_wait_loop_kqueue(&io_h, TCP_MAIN_SELECT_TIMEOUT, 0);
 				send_fd_queue_run(&send2child_q); /* then new io */
-				tcpconn_timeout(0);
+				tcp_timer_run();
 			}
 			break;
 #endif
@@ -2045,7 +2167,7 @@ void tcp_main_loop()
 			while(1){
 				io_wait_loop_devpoll(&io_h, TCP_MAIN_SELECT_TIMEOUT, 0);
 				send_fd_queue_run(&send2child_q); /* then new io */
-				tcpconn_timeout(0);
+				tcp_timer_run();
 			}
 			break;
 #endif
@@ -2075,7 +2197,7 @@ void destroy_tcp()
 								   some process was terminated while holding 
 								   it; this will allow an almost gracious 
 								   shutdown */
-			tcpconn_timeout(1); /* force close/expire for all active tcpconns*/
+			tcpconn_destroy_all(); 
 			shm_free(tcpconn_id_hash);
 			tcpconn_id_hash=0;
 		}
@@ -2100,6 +2222,7 @@ void destroy_tcp()
 			pkg_free(tcp_children);
 			tcp_children=0;
 		}
+		destroy_local_timer(&tcp_main_ltimer);
 }
 
 
@@ -2189,6 +2312,11 @@ int init_tcp()
 					poll_method_name(tcp_poll_method));
 	}
 	
+	if (init_local_timer(&tcp_main_ltimer, get_ticks_raw())!=0){
+		LOG(L_ERR, "ERROR: init_tcp: failed to init local timer\n");
+		goto error;
+	}
+	
 	return 0;
 error:
 	/* clean-up */

+ 52 - 176
tcp_read.c

@@ -37,6 +37,7 @@
  * 2006-02-03  use tsend_stream instead of send_all (andrei)
  * 2006-10-13  added STUN support - state machine for TCP (vlada)
  * 2007-02-20  fixed timeout calc. bug (andrei)
+ * 2007-11-26  improved tcp timers: switched to local_timer (andrei)
  */
 
 #ifdef USE_TCP
@@ -61,6 +62,7 @@
 #include "globals.h"
 #include "receive.h"
 #include "timer.h"
+#include "local_timer.h"
 #include "ut.h"
 #ifdef CORE_TLS
 #include "tls/tls_server.h"
@@ -80,6 +82,8 @@ int is_msg_complete(struct tcp_req* r);
 
 #endif /* USE_STUN */
 
+#define TCPCONN_TIMEOUT_MIN_RUN  1 /* run the timers each new tick */
+
 /* types used in io_wait* */
 enum fd_types { F_NONE, F_TCPMAIN, F_TCPCONN };
 
@@ -88,6 +92,8 @@ static struct tcp_connection* tcp_conn_lst=0;
 static io_wait_h io_w; /* io_wait handler*/
 static int tcpmain_sock=-1;
 
+static struct local_timer tcp_reader_ltimer;
+
 
 /* reads next available bytes
  * return number of bytes read, 0 on EOF or -1 on error,
@@ -675,146 +681,25 @@ void release_tcpconn(struct tcp_connection* c, long state, int unix_sock)
 }
 
 
-#ifdef DEBUG_TCP_RECEIVE
-/* old code known to work, kept arround for debuging */
-void tcp_receive_loop(int unix_sock)
+
+static ticks_t tcpconn_read_timeout(ticks_t t, struct timer_ln* tl, void* data)
 {
-	struct tcp_connection* list; /* list with connections in use */
-	struct tcp_connection* con;
-	struct tcp_connection* c_next;
-	int n;
-	int nfds;
-	int s;
-	long resp;
-	fd_set master_set;
-	fd_set sel_set;
-	int maxfd;
-	struct timeval timeout;
-	ticks_t ticks;
+	struct tcp_connection *c;
 	
+	c=(struct tcp_connection*)data; 
+	/* or (struct tcp...*)(tl-offset(c->timer)) */
 	
-	/* init */
-	list=con=0;
-	FD_ZERO(&master_set);
-	FD_SET(unix_sock, &master_set);
-	maxfd=unix_sock;
-	
-	/* listen on the unix socket for the fd */
-	for(;;){
-			timeout.tv_sec=TCP_CHILD_SELECT_TIMEOUT;
-			timeout.tv_usec=0;
-			sel_set=master_set;
-			nfds=select(maxfd+1, &sel_set, 0 , 0 , &timeout);
-#ifdef EXTRA_DEBUG
-			for (n=0; n<maxfd; n++){
-				if (FD_ISSET(n, &sel_set)) 
-					DBG("tcp receive: FD %d is set\n", n);
-			}
-#endif
-			if (nfds<0){
-				if (errno==EINTR) continue; /* just a signal */
-				/* errors */
-				LOG(L_ERR, "ERROR: tcp_receive_loop: select:(%d) %s\n", errno,
-					strerror(errno));
-				continue;
-			}
-			if (FD_ISSET(unix_sock, &sel_set)){
-				nfds--;
-				/* a new conn from "main" */
-				n=receive_fd(unix_sock, &con, sizeof(con), &s, 0);
-				if (n<0){
-					if (errno == EWOULDBLOCK || errno == EAGAIN ||
-							errno == EINTR){
-						goto skip;
-					}else{
-						LOG(L_CRIT,"BUG: tcp_receive_loop: read_fd: %s\n",
-							strerror(errno));
-						abort(); /* big error*/
-					}
-				}
-				DBG("received n=%d con=%p, fd=%d\n", n, con, s);
-				if (n==0){
-					LOG(L_ERR, "WARNING: tcp_receive_loop: 0 bytes read\n");
-					goto skip;
-				}
-				if (con==0){
-					LOG(L_CRIT, "BUG: tcp_receive_loop: null pointer\n");
-					goto skip;
-				}
-				con->fd=s;
-				if (s==-1) {
-					LOG(L_ERR, "ERROR: tcp_receive_loop: read_fd:"
-									"no fd read\n");
-					resp=CONN_ERROR;
-					con->state=S_CONN_BAD;
-					release_tcpconn(con, resp, unix_sock);
-					goto skip;
-				}
-				con->timeout=get_ticks_raw()+S_TO_TICKS(TCP_CHILD_TIMEOUT);
-				FD_SET(s, &master_set);
-				if (maxfd<s) maxfd=s;
-				if (con==list){
-					LOG(L_CRIT, "BUG: tcp_receive_loop: duplicate"
-							" connection received: %p, id %d, fd %d, refcnt %d"
-							" state %d (n=%d)\n", con, con->id, con->fd,
-							atomic_get(&con->refcnt), con->state, n);
-					resp=CONN_ERROR;
-					release_tcpconn(con, resp, unix_sock);
-					goto skip; /* try to recover */
-				}
-				tcpconn_listadd(list, con, c_next, c_prev);
-			}
-skip:
-			ticks=get_ticks_raw();
-			for (con=list; con ; con=c_next){
-				c_next=con->c_next; /* safe for removing*/
-#ifdef EXTRA_DEBUG
-				DBG("tcp receive: list fd=%d, id=%d, timeout=%d, refcnt=%d\n",
-						con->fd, con->id, con->timeout,
-						atomic_get(&con->refcnt));
-#endif
-				if (con->state<0){
-					/* S_CONN_BAD or S_CONN_ERROR, remove it */
-					resp=CONN_ERROR;
-					FD_CLR(con->fd, &master_set);
-					tcpconn_listrm(list, con, c_next, c_prev);
-					con->state=S_CONN_BAD;
-					release_tcpconn(con, resp, unix_sock);
-					continue;
-				}
-				if (nfds && FD_ISSET(con->fd, &sel_set)){
-#ifdef EXTRA_DEBUG
-					DBG("tcp receive: match, fd:isset\n");
-#endif
-					nfds--;
-					resp=tcp_read_req(con);
-					
-					if (resp<0){
-						FD_CLR(con->fd, &master_set);
-						tcpconn_listrm(list, con, c_next, c_prev);
-						con->state=S_CONN_BAD;
-						release_tcpconn(con, resp, unix_sock);
-					}else{
-						/* update timeout */
-						con->timeout=ticks+TCP_CHILD_TIMEOUT;
-					}
-				}else{
-					/* timeout */
-					if ((s_ticks_t)(ticks-con->timeout)>=0){
-						/* expired, return to "tcp main" */
-						DBG("tcp_receive_loop: %p expired (%d, %d)\n",
-								con, con->timeout, ticks);
-						resp=CONN_RELEASE;
-						FD_CLR(con->fd, &master_set);
-						tcpconn_listrm(list, con, c_next, c_prev);
-						release_tcpconn(con, resp, unix_sock);
-					}
-				}
-			}
-		
+	if (likely(!(c->state<0) && TICKS_LT(t, c->timeout))){
+		/* timeout extended, exit */
+		return (ticks_t)(c->timeout - t);
 	}
+	/* if conn->state is ERROR or BAD => force timeout too */
+	io_watch_del(&io_w, c->fd, -1, IO_FD_CLOSING);
+	tcpconn_listrm(tcp_conn_lst, c, c_next, c_prev);
+	release_tcpconn(c, (c->state<0)?CONN_ERROR:CONN_RELEASE, tcpmain_sock);
+	
+	return 0;
 }
-#else /* DEBUG_TCP_RECEIVE */
 
 
 
@@ -838,13 +723,14 @@ inline static int handle_io(struct fd_map* fm, int idx)
 	struct tcp_connection* con;
 	int s;
 	long resp;
+	ticks_t t;
 	
 	switch(fm->type){
 		case F_TCPMAIN:
 again:
 			ret=n=receive_fd(fm->fd, &con, sizeof(con), &s, 0);
 			DBG("received n=%d con=%p, fd=%d\n", n, con, s);
-			if (n<0){
+			if (unlikely(n<0)){
 				if (errno == EWOULDBLOCK || errno == EAGAIN){
 					ret=0;
 					break;
@@ -855,21 +741,21 @@ again:
 						abort(); /* big error*/
 				}
 			}
-			if (n==0){
+			if (unlikely(n==0)){
 				LOG(L_ERR, "WARNING: tcp_receive: handle_io: 0 bytes read\n");
 				goto error;
 			}
-			if (con==0){
+			if (unlikely(con==0)){
 					LOG(L_CRIT, "BUG: tcp_receive: handle_io null pointer\n");
 					goto error;
 			}
 			con->fd=s;
-			if (s==-1) {
+			if (unlikely(s==-1)) {
 				LOG(L_ERR, "ERROR: tcp_receive: handle_io: read_fd:"
 									"no fd read\n");
 				goto con_error;
 			}
-			if (con==tcp_conn_lst){
+			if (unlikely(con==tcp_conn_lst)){
 				LOG(L_CRIT, "BUG: tcp_receive: handle_io: duplicate"
 							" connection received: %p, id %d, fd %d, refcnt %d"
 							" state %d (n=%d)\n", con, con->id, con->fd,
@@ -882,21 +768,29 @@ again:
 			 * handle_io might decide to del. the new connection =>
 			 * must be in the list */
 			tcpconn_listadd(tcp_conn_lst, con, c_next, c_prev);
-			con->timeout=get_ticks_raw()+S_TO_TICKS(TCP_CHILD_TIMEOUT);
-			if (io_watch_add(&io_w, s, F_TCPCONN, con)<0){
+			t=get_ticks_raw();
+			con->timeout=t+S_TO_TICKS(TCP_CHILD_TIMEOUT);
+			/* re-activate the timer */
+			con->timer.f=tcpconn_read_timeout;
+			timer_reinit(&con->timer);
+			local_timer_add(&tcp_reader_ltimer, &con->timer,
+								S_TO_TICKS(TCP_CHILD_TIMEOUT), t);
+			if (unlikely(io_watch_add(&io_w, s, F_TCPCONN, con))<0){
 				LOG(L_CRIT, "ERROR: tcp_receive: handle_io: failed to add"
 						" new socket to the fd list\n");
 				tcpconn_listrm(tcp_conn_lst, con, c_next, c_prev);
+				local_timer_del(&tcp_reader_ltimer, &con->timer);
 				goto con_error;
 			}
 			break;
 		case F_TCPCONN:
 			con=(struct tcp_connection*)fm->data;
 			resp=tcp_read_req(con, &ret);
-			if (resp<0){
+			if (unlikely(resp<0)){
 				ret=-1; /* some error occured */
 				io_watch_del(&io_w, con->fd, idx, IO_FD_CLOSING);
 				tcpconn_listrm(tcp_conn_lst, con, c_next, c_prev);
+				local_timer_del(&tcp_reader_ltimer, &con->timer);
 				con->state=S_CONN_BAD;
 				release_tcpconn(con, resp, tcpmain_sock);
 			}else{
@@ -925,35 +819,15 @@ error:
 
 
 
-/* releases expired connections and cleans up bad ones (state<0) */
-static inline void tcp_receive_timeout()
+inline static void tcp_reader_timer_run()
 {
-	struct tcp_connection* con;
-	struct tcp_connection* next;
 	ticks_t ticks;
+	static ticks_t prev_ticks=0;
 	
 	ticks=get_ticks_raw();
-	for (con=tcp_conn_lst; con; con=next){
-		next=con->c_next; /* safe for removing */
-		if (con->state<0){   /* kill bad connections */ 
-			/* S_CONN_BAD or S_CONN_ERROR, remove it */
-			/* fd will be closed in release_tcpconn */
-			io_watch_del(&io_w, con->fd, -1, IO_FD_CLOSING);
-			tcpconn_listrm(tcp_conn_lst, con, c_next, c_prev);
-			con->state=S_CONN_BAD;
-			release_tcpconn(con, CONN_ERROR, tcpmain_sock);
-			continue;
-		}
-		if ((s_ticks_t)(ticks-con->timeout)>=0){
-			/* expired, return to "tcp main" */
-			DBG("tcp_receive_loop: %p expired (%d, %d)\n",
-					con, con->timeout, ticks);
-			/* fd will be closed in release_tcpconn */
-			io_watch_del(&io_w, con->fd, -1, IO_FD_CLOSING);
-			tcpconn_listrm(tcp_conn_lst, con, c_next, c_prev);
-			release_tcpconn(con, CONN_RELEASE, tcpmain_sock);
-		}
-	}
+	if (unlikely((ticks-prev_ticks)<TCPCONN_TIMEOUT_MIN_RUN)) return;
+	prev_ticks=ticks;
+	local_timer_run(&tcp_reader_ltimer, ticks);
 }
 
 
@@ -965,6 +839,8 @@ void tcp_receive_loop(int unix_sock)
 	tcpmain_sock=unix_sock; /* init com. socket */
 	if (init_io_wait(&io_w, get_max_open_fds(), tcp_poll_method)<0)
 		goto error;
+	if (init_local_timer(&tcp_reader_ltimer, get_ticks_raw())!=0)
+		goto error;
 	/* add the unix socket */
 	if (io_watch_add(&io_w, tcpmain_sock, F_TCPMAIN, 0)<0){
 		LOG(L_CRIT, "ERROR: tcp_receive_loop: init: failed to add socket "
@@ -976,14 +852,14 @@ void tcp_receive_loop(int unix_sock)
 		case POLL_POLL:
 				while(1){
 					io_wait_loop_poll(&io_w, TCP_CHILD_SELECT_TIMEOUT, 0);
-					tcp_receive_timeout();
+					tcp_reader_timer_run();
 				}
 				break;
 #ifdef HAVE_SELECT
 		case POLL_SELECT:
 			while(1){
 				io_wait_loop_select(&io_w, TCP_CHILD_SELECT_TIMEOUT, 0);
-				tcp_receive_timeout();
+				tcp_reader_timer_run();
 			}
 			break;
 #endif
@@ -991,7 +867,7 @@ void tcp_receive_loop(int unix_sock)
 		case POLL_SIGIO_RT:
 			while(1){
 				io_wait_loop_sigio_rt(&io_w, TCP_CHILD_SELECT_TIMEOUT);
-				tcp_receive_timeout();
+				tcp_reader_timer_run();
 			}
 			break;
 #endif
@@ -999,13 +875,13 @@ void tcp_receive_loop(int unix_sock)
 		case POLL_EPOLL_LT:
 			while(1){
 				io_wait_loop_epoll(&io_w, TCP_CHILD_SELECT_TIMEOUT, 0);
-				tcp_receive_timeout();
+				tcp_reader_timer_run();
 			}
 			break;
 		case POLL_EPOLL_ET:
 			while(1){
 				io_wait_loop_epoll(&io_w, TCP_CHILD_SELECT_TIMEOUT, 1);
-				tcp_receive_timeout();
+				tcp_reader_timer_run();
 			}
 			break;
 #endif
@@ -1013,7 +889,7 @@ void tcp_receive_loop(int unix_sock)
 		case POLL_KQUEUE:
 			while(1){
 				io_wait_loop_kqueue(&io_w, TCP_CHILD_SELECT_TIMEOUT, 0);
-				tcp_receive_timeout();
+				tcp_reader_timer_run();
 			}
 			break;
 #endif
@@ -1021,7 +897,7 @@ void tcp_receive_loop(int unix_sock)
 		case POLL_DEVPOLL:
 			while(1){
 				io_wait_loop_devpoll(&io_w, TCP_CHILD_SELECT_TIMEOUT, 0);
-				tcp_receive_timeout();
+				tcp_reader_timer_run();
 			}
 			break;
 #endif
@@ -1037,7 +913,7 @@ error:
 	exit(-1);
 }
 
-#endif /* DEBUG_TCP_RECEIVE */
+
 
 #ifdef USE_STUN
 int is_msg_complete(struct tcp_req* r)