Browse Source

- tcp support for queueing writes: if some data cannot be written immediately
on the socket (socket buffers full or still connecting), the data will be
queued and written at a latter time (max. queue size per socket is controlled
by tcp_conn_wq_max, timeout by tcp_send_timeout and total queued bytes / max.
mem. used by tcp_wq_max). By default disabled (experimental), to enable it
use tcp_buf_write=yes in ser.cfg. To compile without queueing support
use -DNO_TCP_BUF_WRITE.

Andrei Pelinescu-Onciul 17 years ago
parent
commit
885b9f62e1
11 changed files with 612 additions and 39 deletions
  1. 1 1
      Makefile.defs
  2. 9 0
      NEWS
  3. 9 0
      cfg.lex
  4. 27 0
      cfg.y
  5. 9 3
      core_cmd.c
  6. 0 1
      io_wait.h
  7. 27 2
      tcp_conn.h
  8. 2 0
      tcp_info.h
  9. 500 29
      tcp_main.c
  10. 17 3
      tcp_options.c
  11. 11 0
      tcp_options.h

+ 1 - 1
Makefile.defs

@@ -77,7 +77,7 @@ MAIN_NAME=ser
 VERSION = 2
 VERSION = 2
 PATCHLEVEL = 1
 PATCHLEVEL = 1
 SUBLEVEL =  0
 SUBLEVEL =  0
-EXTRAVERSION = -dev13
+EXTRAVERSION = -dev14
 
 
 SER_VER = $(shell expr $(VERSION) \* 1000000 + $(PATCHLEVEL) \* 1000 + \
 SER_VER = $(shell expr $(VERSION) \* 1000000 + $(PATCHLEVEL) \* 1000 + \
 			$(SUBLEVEL) )
 			$(SUBLEVEL) )

+ 9 - 0
NEWS

@@ -120,6 +120,15 @@ new config variables:
      will be cached inside the process calling tcp_send (performance increase
      will be cached inside the process calling tcp_send (performance increase
      for sending over tcp at the cost of slightly slower connection closing and
      for sending over tcp at the cost of slightly slower connection closing and
      extra FDs kept open)
      extra FDs kept open)
+  tcp_buf_write = yes | no (default no) - if enabled all the tcp  writes that 
+     would block / wait for connect to finish, will be queued and attempted
+     latter (see also tcp_conn_wq_max and tcp_wq_max).
+  tcp_conn_wq_max = bytes (default 32 K) - maximum bytes queued for write 
+     allowed per connection. Attempting to queue more bytes would result
+     in an error and in the connection being closed (too slow). If 
+     tcp_write_buf is not enabled, it has no effect.
+  tcp_wq_max = bytes (default 10 Mb) - maximum bytes queued for write allowed
+     globally. It has no effect if tcp_write_buf is not enabled.
   tcp_defer_accept =  yes | no (default no) on freebsd  / number of seconds
   tcp_defer_accept =  yes | no (default no) on freebsd  / number of seconds
         before timeout on linux (default disabled) - tcp accepts will be 
         before timeout on linux (default disabled) - tcp accepts will be 
         delayed until some data is received (improves performance on proxies
         delayed until some data is received (improves performance on proxies

+ 9 - 0
cfg.lex

@@ -293,6 +293,9 @@ TCP_MAX_CONNECTIONS	"tcp_max_connections"
 TCP_SOURCE_IPV4		"tcp_source_ipv4"
 TCP_SOURCE_IPV4		"tcp_source_ipv4"
 TCP_SOURCE_IPV6		"tcp_source_ipv6"
 TCP_SOURCE_IPV6		"tcp_source_ipv6"
 TCP_OPT_FD_CACHE	"tcp_fd_cache"
 TCP_OPT_FD_CACHE	"tcp_fd_cache"
+TCP_OPT_BUF_WRITE	"tcp_buf_write"
+TCP_OPT_CONN_WQ_MAX	"tcp_conn_wq_max"
+TCP_OPT_WQ_MAX		"tcp_wq_max"
 TCP_OPT_DEFER_ACCEPT "tcp_defer_accept"
 TCP_OPT_DEFER_ACCEPT "tcp_defer_accept"
 TCP_OPT_DELAYED_ACK	"tcp_delayed_ack"
 TCP_OPT_DELAYED_ACK	"tcp_delayed_ack"
 TCP_OPT_SYNCNT		"tcp_syncnt"
 TCP_OPT_SYNCNT		"tcp_syncnt"
@@ -561,6 +564,12 @@ EAT_ABLE	[\ \t\b\r]
 									return TCP_SOURCE_IPV6; }
 									return TCP_SOURCE_IPV6; }
 <INITIAL>{TCP_OPT_FD_CACHE}		{ count(); yylval.strval=yytext;
 <INITIAL>{TCP_OPT_FD_CACHE}		{ count(); yylval.strval=yytext;
 									return TCP_OPT_FD_CACHE; }
 									return TCP_OPT_FD_CACHE; }
+<INITIAL>{TCP_OPT_CONN_WQ_MAX}	{ count(); yylval.strval=yytext;
+									return TCP_OPT_CONN_WQ_MAX; }
+<INITIAL>{TCP_OPT_WQ_MAX}	{ count(); yylval.strval=yytext;
+									return TCP_OPT_WQ_MAX; }
+<INITIAL>{TCP_OPT_BUF_WRITE}	{ count(); yylval.strval=yytext;
+									return TCP_OPT_BUF_WRITE; }
 <INITIAL>{TCP_OPT_DEFER_ACCEPT}	{ count(); yylval.strval=yytext;
 <INITIAL>{TCP_OPT_DEFER_ACCEPT}	{ count(); yylval.strval=yytext;
 									return TCP_OPT_DEFER_ACCEPT; }
 									return TCP_OPT_DEFER_ACCEPT; }
 <INITIAL>{TCP_OPT_DELAYED_ACK}	{ count(); yylval.strval=yytext;
 <INITIAL>{TCP_OPT_DELAYED_ACK}	{ count(); yylval.strval=yytext;

+ 27 - 0
cfg.y

@@ -334,6 +334,9 @@ static struct socket_id* mk_listen_id(char*, int, int);
 %token TCP_SOURCE_IPV4
 %token TCP_SOURCE_IPV4
 %token TCP_SOURCE_IPV6
 %token TCP_SOURCE_IPV6
 %token TCP_OPT_FD_CACHE
 %token TCP_OPT_FD_CACHE
+%token TCP_OPT_BUF_WRITE
+%token TCP_OPT_CONN_WQ_MAX
+%token TCP_OPT_WQ_MAX
 %token TCP_OPT_DEFER_ACCEPT
 %token TCP_OPT_DEFER_ACCEPT
 %token TCP_OPT_DELAYED_ACK
 %token TCP_OPT_DELAYED_ACK
 %token TCP_OPT_SYNCNT
 %token TCP_OPT_SYNCNT
@@ -803,6 +806,30 @@ assign_stm:
 		#endif
 		#endif
 	}
 	}
 	| TCP_OPT_FD_CACHE EQUAL error { yyerror("boolean value expected"); }
 	| TCP_OPT_FD_CACHE EQUAL error { yyerror("boolean value expected"); }
+	| TCP_OPT_BUF_WRITE EQUAL NUMBER {
+		#ifdef USE_TCP
+			tcp_options.tcp_buf_write=$3;
+		#else
+			warn("tcp support not compiled in");
+		#endif
+	}
+	| TCP_OPT_BUF_WRITE EQUAL error { yyerror("boolean value expected"); }
+	| TCP_OPT_CONN_WQ_MAX EQUAL NUMBER {
+		#ifdef USE_TCP
+			tcp_options.tcpconn_wq_max=$3;
+		#else
+			warn("tcp support not compiled in");
+		#endif
+	}
+	| TCP_OPT_CONN_WQ_MAX error { yyerror("boolean value expected"); }
+	| TCP_OPT_WQ_MAX EQUAL NUMBER {
+		#ifdef USE_TCP
+			tcp_options.tcp_wq_max=$3;
+		#else
+			warn("tcp support not compiled in");
+		#endif
+	}
+	| TCP_OPT_WQ_MAX error { yyerror("boolean value expected"); }
 	| TCP_OPT_DEFER_ACCEPT EQUAL NUMBER {
 	| TCP_OPT_DEFER_ACCEPT EQUAL NUMBER {
 		#ifdef USE_TCP
 		#ifdef USE_TCP
 			tcp_options.defer_accept=$3;
 			tcp_options.defer_accept=$3;

+ 9 - 3
core_cmd.c

@@ -532,10 +532,11 @@ static void core_tcpinfo(rpc_t* rpc, void* c)
 	if (!tcp_disable){
 	if (!tcp_disable){
 		tcp_get_info(&ti);
 		tcp_get_info(&ti);
 		rpc->add(c, "{", &handle);
 		rpc->add(c, "{", &handle);
-		rpc->struct_add(handle, "ddd",
+		rpc->struct_add(handle, "dddd",
 			"readers", ti.tcp_readers,
 			"readers", ti.tcp_readers,
 			"max_connections", ti.tcp_max_connections,
 			"max_connections", ti.tcp_max_connections,
-			"opened_connections", ti.tcp_connections_no
+			"opened_connections", ti.tcp_connections_no,
+			"write_queued_bytes", ti.tcp_write_queued
 		);
 		);
 	}else{
 	}else{
 		rpc->fault(c, 500, "tcp support disabled");
 		rpc->fault(c, 500, "tcp support disabled");
@@ -561,8 +562,13 @@ static void core_tcp_options(rpc_t* rpc, void* c)
 	if (!tcp_disable){
 	if (!tcp_disable){
 		tcp_options_get(&t);
 		tcp_options_get(&t);
 		rpc->add(c, "{", &handle);
 		rpc->add(c, "{", &handle);
-		rpc->struct_add(handle, "ddddddddd",
+		rpc->struct_add(handle, "ddddddddddddd",
 			"fd_cache",		t.fd_cache,
 			"fd_cache",		t.fd_cache,
+			"tcp_buf_write",	t.tcp_buf_write,
+			"tcpconn_wq_max",	t.tcpconn_wq_max,
+			"tcp_wq_max",	t.tcp_wq_max,
+			"tcp_wq_timeout",	TICKS_TO_S(t.tcp_wq_timeout),
+			
 			"defer_accept",	t.defer_accept,
 			"defer_accept",	t.defer_accept,
 			"delayed_ack",	t.delayed_ack,
 			"delayed_ack",	t.delayed_ack,
 			"syncnt",		t.syncnt,
 			"syncnt",		t.syncnt,

+ 0 - 1
io_wait.h

@@ -842,7 +842,6 @@ again_devpoll2:
 					h->poll_method);
 					h->poll_method);
 			goto error;
 			goto error;
 	}
 	}
-	h->fd_no--;
 	return 0;
 	return 0;
 error:
 error:
 	return -1;
 	return -1;

+ 27 - 2
tcp_conn.h

@@ -34,6 +34,7 @@
  *  2007-07-26  improved tcp connection hash function; increased aliases
  *  2007-07-26  improved tcp connection hash function; increased aliases
  *               hash size (andrei)
  *               hash size (andrei)
  *  2007-11-26  switched to local_timer (andrei)
  *  2007-11-26  switched to local_timer (andrei)
+ *  2007-11-30  buffered write support (andrei)
  */
  */
 
 
 
 
@@ -41,6 +42,8 @@
 #ifndef _tcp_conn_h
 #ifndef _tcp_conn_h
 #define _tcp_conn_h
 #define _tcp_conn_h
 
 
+#include "tcp_options.h"
+
 #include "ip_addr.h"
 #include "ip_addr.h"
 #include "locking.h"
 #include "locking.h"
 #include "atomic_ops.h"
 #include "atomic_ops.h"
@@ -67,6 +70,7 @@
 #define F_CONN_NON_BLOCKING 1
 #define F_CONN_NON_BLOCKING 1
 #define F_CONN_REMOVED      2 /* no longer  in "main" listen fd list */
 #define F_CONN_REMOVED      2 /* no longer  in "main" listen fd list */
 #define F_CONN_READER       4 /* handled by a tcp reader */
 #define F_CONN_READER       4 /* handled by a tcp reader */
+#define F_CONN_WRITE_W      8 /* watched for write (main) */
 
 
 
 
 enum tcp_req_errors {	TCP_REQ_INIT, TCP_REQ_OK, TCP_READ_ERROR,
 enum tcp_req_errors {	TCP_REQ_INIT, TCP_REQ_OK, TCP_READ_ERROR,
@@ -86,7 +90,7 @@ enum tcp_conn_states { S_CONN_ERROR=-2, S_CONN_BAD=-1, S_CONN_OK=0,
 
 
 /* fd communication commands */
 /* fd communication commands */
 enum conn_cmds { CONN_DESTROY=-3, CONN_ERROR=-2, CONN_EOF=-1, CONN_RELEASE, 
 enum conn_cmds { CONN_DESTROY=-3, CONN_ERROR=-2, CONN_EOF=-1, CONN_RELEASE, 
-					CONN_GET_FD, CONN_NEW };
+					CONN_GET_FD, CONN_NEW, CONN_QUEUED_WRITE };
 /* CONN_RELEASE, EOF, ERROR, DESTROY can be used by "reader" processes
 /* CONN_RELEASE, EOF, ERROR, DESTROY can be used by "reader" processes
  * CONN_GET_FD, NEW, ERROR only by writers */
  * CONN_GET_FD, NEW, ERROR only by writers */
 
 
@@ -121,6 +125,23 @@ struct tcp_conn_alias{
 };
 };
 
 
 
 
+#ifdef TCP_BUF_WRITE
+	struct tcp_wbuffer{
+		struct tcp_wbuffer* next;
+		unsigned int b_size;
+		char buf[1];
+	};
+
+	struct tcp_wbuffer_queue{
+		struct tcp_wbuffer* first;
+		struct tcp_wbuffer* last;
+		unsigned int queued; /* total size */
+		unsigned int offset; /* offset in the first wbuffer were data
+								starts */
+		unsigned int last_used; /* how much of the last buffer is used */
+	};
+#endif
+
 
 
 struct tcp_connection{
 struct tcp_connection{
 	int s; /*socket, used by "tcp main" */
 	int s; /*socket, used by "tcp main" */
@@ -137,7 +158,7 @@ struct tcp_connection{
 	enum tcp_conn_states state; /* connection state */
 	enum tcp_conn_states state; /* connection state */
 	void* extra_data; /* extra data associated to the connection, 0 for tcp*/
 	void* extra_data; /* extra data associated to the connection, 0 for tcp*/
 	struct timer_ln timer;
 	struct timer_ln timer;
-	unsigned int timeout;/* connection timeout, after this it will be removed*/
+	ticks_t timeout;/* connection timeout, after this it will be removed*/
 	unsigned id_hash; /* hash index in the id_hash */
 	unsigned id_hash; /* hash index in the id_hash */
 	struct tcp_connection* id_next; /* next, prev in id hash table */
 	struct tcp_connection* id_next; /* next, prev in id hash table */
 	struct tcp_connection* id_prev;
 	struct tcp_connection* id_prev;
@@ -145,6 +166,10 @@ struct tcp_connection{
 	struct tcp_connection* c_prev;
 	struct tcp_connection* c_prev;
 	struct tcp_conn_alias con_aliases[TCP_CON_MAX_ALIASES];
 	struct tcp_conn_alias con_aliases[TCP_CON_MAX_ALIASES];
 	int aliases; /* aliases number, at least 1 */
 	int aliases; /* aliases number, at least 1 */
+#ifdef TCP_BUF_WRITE
+	ticks_t last_write; /* time when the last write took place */
+	struct tcp_wbuffer_queue wbuf_q;
+#endif
 };
 };
 
 
 
 

+ 2 - 0
tcp_info.h

@@ -35,6 +35,8 @@ struct tcp_gen_info{
 	int tcp_readers;
 	int tcp_readers;
 	int tcp_max_connections;
 	int tcp_max_connections;
 	int tcp_connections_no; /* crt. number */
 	int tcp_connections_no; /* crt. number */
+	int tcp_write_queued; /* total bytes queued for write, 0 if no
+							 write queued support is enabled */
 };
 };
 
 
 
 

+ 500 - 29
tcp_main.c

@@ -87,6 +87,7 @@
  *  2007-11-27  added send fd cache and reader fd reuse (andrei)
  *  2007-11-27  added send fd cache and reader fd reuse (andrei)
  *  2007-11-28  added support for TCP_DEFER_ACCEPT, KEEPALIVE, KEEPINTVL,
  *  2007-11-28  added support for TCP_DEFER_ACCEPT, KEEPALIVE, KEEPINTVL,
  *               KEEPCNT, QUICKACK, SYNCNT, LINGER2 (andrei)
  *               KEEPCNT, QUICKACK, SYNCNT, LINGER2 (andrei)
+ *  2007-12-04  support for queueing write requests (andrei)
  */
  */
 
 
 
 
@@ -145,6 +146,7 @@
 
 
 #include "tcp_info.h"
 #include "tcp_info.h"
 #include "tcp_options.h"
 #include "tcp_options.h"
+#include "ut.h"
 
 
 #define local_malloc pkg_malloc
 #define local_malloc pkg_malloc
 #define local_free   pkg_free
 #define local_free   pkg_free
@@ -177,6 +179,12 @@
 #define TCPCONN_TIMEOUT_MIN_RUN 1  /* once per tick */
 #define TCPCONN_TIMEOUT_MIN_RUN 1  /* once per tick */
 #define TCPCONN_WAIT_TIMEOUT 1 /* 1 tick */
 #define TCPCONN_WAIT_TIMEOUT 1 /* 1 tick */
 
 
+#ifdef TCP_BUF_WRITE
+#define TCP_WBUF_SIZE	1024 /* FIXME: after debugging switch to 16-32k */
+static unsigned int* tcp_total_wq=0;
+#endif
+
+
 enum fd_types { F_NONE, F_SOCKINFO /* a tcp_listen fd */,
 enum fd_types { F_NONE, F_SOCKINFO /* a tcp_listen fd */,
 				F_TCPCONN, F_TCPCHILD, F_PROC };
 				F_TCPCONN, F_TCPCHILD, F_PROC };
 
 
@@ -542,6 +550,173 @@ end:
 
 
 
 
 
 
+inline static int _tcpconn_write_nb(int fd, struct tcp_connection* c,
+									char* buf, int len);
+
+
+#ifdef TCP_BUF_WRITE
+
+
+inline static int wbufq_add(struct  tcp_connection* c, char* data, 
+							unsigned int size)
+{
+	struct tcp_wbuffer_queue* q;
+	struct tcp_wbuffer* wb;
+	unsigned int last_free;
+	unsigned int wb_size;
+	unsigned int crt_size;
+	ticks_t t;
+	
+	q=&c->wbuf_q;
+	t=get_ticks_raw();
+	if (unlikely(	((q->queued+size)>tcp_options.tcpconn_wq_max) ||
+					((*tcp_total_wq+size)>tcp_options.tcp_wq_max) ||
+					(q->first &&
+					TICKS_GT(t, c->last_write+tcp_options.tcp_wq_timeout)) )){
+		LOG(L_ERR, "ERROR: wbufq_add(%d bytes): write queue full or timeout "
+					" (%d, total %d, last write %d s ago)\n",
+					size, q->queued, *tcp_total_wq,
+					TICKS_TO_S(t-c->last_write));
+		goto error;
+	}
+	
+	if (unlikely(q->last==0)){
+		wb_size=MAX_unsigned(TCP_WBUF_SIZE, size);
+		wb=shm_malloc(sizeof(*wb)+wb_size-1);
+		if (unlikely(wb==0))
+			goto error;
+		wb->b_size=wb_size;
+		wb->next=0;
+		q->last=wb;
+		q->first=wb;
+		q->last_used=0;
+		q->offset=0;
+		c->last_write=get_ticks_raw(); /* start with the crt. time */
+	}else{
+		wb=q->last;
+	}
+	
+	while(size){
+		last_free=wb->b_size-q->last_used;
+		if (last_free==0){
+			wb_size=MAX_unsigned(TCP_WBUF_SIZE, size);
+			wb=shm_malloc(sizeof(*wb)+wb_size-1);
+			if (unlikely(wb==0))
+				goto error;
+			wb->b_size=wb_size;
+			wb->next=0;
+			q->last->next=wb;
+			q->last=wb;
+			q->last_used=0;
+			last_free=wb->b_size;
+		}
+		crt_size=MIN_unsigned(last_free, size);
+		memcpy(wb->buf, data, crt_size);
+		q->last_used+=crt_size;
+		size-=crt_size;
+		data+=crt_size;
+		q->queued+=crt_size;
+		atomic_add_int((int*)tcp_total_wq, crt_size);
+	}
+	return 0;
+error:
+	return -1;
+}
+
+
+
+inline static void wbufq_destroy( struct  tcp_wbuffer_queue* q)
+{
+	struct tcp_wbuffer* wb;
+	struct tcp_wbuffer* next_wb;
+	int unqueued;
+	
+	unqueued=0;
+	if (likely(q->first)){
+		wb=q->first;
+		do{
+			next_wb=wb->next;
+			unqueued+=(wb==q->last)?q->last_used:wb->b_size;
+			if (wb==q->first)
+				unqueued-=q->offset;
+			shm_free(wb);
+			wb=next_wb;
+		}while(wb);
+	}
+	memset(q, 0, sizeof(*q));
+	atomic_add_int((int*)tcp_total_wq, -unqueued);
+}
+
+
+
+/* tries to empty the queue
+ * returns -1 on error, bytes written on success (>=0) 
+ * if the whole queue is emptied => sets *empty*/
+inline static int wbufq_run(int fd, struct tcp_connection* c, int* empty)
+{
+	struct tcp_wbuffer_queue* q;
+	struct tcp_wbuffer* wb;
+	int n;
+	int ret;
+	int block_size;
+	ticks_t t;
+	char* buf;
+	
+	*empty=0;
+	ret=0;
+	t=get_ticks_raw();
+	lock_get(&c->write_lock);
+	q=&c->wbuf_q;
+	while(q->first){
+		block_size=((q->first==q->last)?q->last_used:q->first->b_size)-
+						q->offset;
+		buf=q->first->buf+q->offset;
+		n=_tcpconn_write_nb(fd, c, buf, block_size);
+		if (likely(n>0)){
+			ret+=n;
+			if (likely(n==block_size)){
+				wb=q->first;
+				q->first=q->first->next; 
+				shm_free(wb);
+				q->offset=0;
+				q->queued-=block_size;
+				atomic_add_int((int*)tcp_total_wq, -block_size);
+			}else{
+				q->offset+=n;
+				q->queued-=n;
+				atomic_add_int((int*)tcp_total_wq, -n);
+				break;
+			}
+			c->last_write=t;
+			c->state=S_CONN_OK;
+		}else{
+			if (n<0){
+				/* EINTR is handled inside _tcpconn_write_nb */
+				if (!(errno==EAGAIN || errno==EWOULDBLOCK)){
+					ret=-1;
+					LOG(L_ERR, "ERROR: wbuf_runq: %s [%d]\n",
+						strerror(errno), errno);
+				}
+			}
+			break;
+		}
+	}
+	if (likely(q->first==0)){
+		q->last=0;
+		q->last_used=0;
+		q->offset=0;
+		*empty=1;
+	}
+	if (unlikely(c->state==S_CONN_CONNECT && (ret>0)))
+			c->state=S_CONN_OK;
+	lock_release(&c->write_lock);
+	return ret;
+}
+
+#endif /* TCP_BUF_WRITE */
+
+
+
 #if 0
 #if 0
 /* blocking write even on non-blocking sockets 
 /* blocking write even on non-blocking sockets 
  * if TCP_TIMEOUT will return with error */
  * if TCP_TIMEOUT will return with error */
@@ -687,6 +862,10 @@ struct tcp_connection* tcpconn_connect( union sockaddr_union* server,
 	socklen_t my_name_len;
 	socklen_t my_name_len;
 	struct tcp_connection* con;
 	struct tcp_connection* con;
 	struct ip_addr ip;
 	struct ip_addr ip;
+	enum tcp_conn_states state;
+#ifdef TCP_BUF_WRITE
+	int n;
+#endif /* TCP_BUF_WRITE */
 
 
 	s=-1;
 	s=-1;
 	
 	
@@ -710,11 +889,30 @@ struct tcp_connection* tcpconn_connect( union sockaddr_union* server,
 	if (from && bind(s, &from->s, sockaddru_len(*from)) != 0)
 	if (from && bind(s, &from->s, sockaddru_len(*from)) != 0)
 		LOG(L_WARN, "WARNING: tcpconn_connect: binding to source address"
 		LOG(L_WARN, "WARNING: tcpconn_connect: binding to source address"
 					" failed: %s [%d]\n", strerror(errno), errno);
 					" failed: %s [%d]\n", strerror(errno), errno);
-
-	if (tcp_blocking_connect(s, &server->s, sockaddru_len(*server))<0){
-		LOG(L_ERR, "ERROR: tcpconn_connect: tcp_blocking_connect failed\n");
-		goto error;
+#ifdef TCP_BUF_WRITE
+	if (likely(tcp_options.tcp_buf_write)){
+again:
+		n=connect(s, &server->s, sockaddru_len(*server));
+		if (unlikely(n==-1)){
+			if (errno==EINTR) goto again;
+			if (errno!=EINPROGRESS && errno!=EALREADY){
+				LOG(L_ERR, "ERROR: tcpconn_connect: connect: (%d) %s\n",
+						errno, strerror(errno));
+				goto error;
+			}
+			state=S_CONN_CONNECT;
+		}
+	}else{
+#endif /* TCP_BUF_WRITE */
+		if (tcp_blocking_connect(s, &server->s, sockaddru_len(*server))<0){
+			LOG(L_ERR, "ERROR: tcpconn_connect: tcp_blocking_connect"
+						" failed\n");
+			goto error;
+		}
+		state=S_CONN_OK;
+#ifdef TCP_BUF_WRITE
 	}
 	}
+#endif /* TCP_BUF_WRITE */
 	if (from){
 	if (from){
 		su2ip_addr(&ip, from);
 		su2ip_addr(&ip, from);
 		if (!ip_addr_any(&ip))
 		if (!ip_addr_any(&ip))
@@ -746,7 +944,7 @@ skip:
 		else si=sendipv6_tcp;
 		else si=sendipv6_tcp;
 #endif
 #endif
 	}
 	}
-	con=tcpconn_new(s, server, from, si,  type, S_CONN_CONNECT);
+	con=tcpconn_new(s, server, from, si,  type, state);
 	if (con==0){
 	if (con==0){
 		LOG(L_ERR, "ERROR: tcp_connect: tcpconn_new failed, closing the "
 		LOG(L_ERR, "ERROR: tcp_connect: tcpconn_new failed, closing the "
 				 " socket\n");
 				 " socket\n");
@@ -818,6 +1016,10 @@ static inline void _tcpconn_detach(struct tcp_connection *c)
 
 
 static inline void _tcpconn_free(struct tcp_connection* c)
 static inline void _tcpconn_free(struct tcp_connection* c)
 {
 {
+#ifdef TCP_BUF_WRITE
+	if (unlikely(c->wbuf_q.first))
+		wbufq_destroy(&c->wbuf_q);
+#endif
 	lock_destroy(&c->write_lock);
 	lock_destroy(&c->write_lock);
 #ifdef USE_TLS
 #ifdef USE_TLS
 	if (unlikely(c->type==PROTO_TLS)) tls_tcpconn_clean(c);
 	if (unlikely(c->type==PROTO_TLS)) tls_tcpconn_clean(c);
@@ -1134,6 +1336,9 @@ int tcp_send(struct dest_info* dst, union sockaddr_union* from,
 	long response[2];
 	long response[2];
 	int n;
 	int n;
 	int do_close_fd;
 	int do_close_fd;
+#ifdef TCP_BUF_WRITE
+	int enable_write_watch;
+#endif /* TCP_BUF_WRITE */
 #ifdef TCP_FD_CACHE
 #ifdef TCP_FD_CACHE
 	struct fd_cache_entry* fd_cache_e;
 	struct fd_cache_entry* fd_cache_e;
 	
 	
@@ -1204,6 +1409,24 @@ no_id:
 			goto send_it;
 			goto send_it;
 		}
 		}
 get_fd:
 get_fd:
+#ifdef TCP_BUF_WRITE
+		/* if data is already queued, we don't need the fd any more */
+		if (unlikely(tcp_options.tcp_buf_write && c->wbuf_q.first)){
+			lock_get(&c->write_lock);
+				if (likely(c->wbuf_q.first)){
+					do_close_fd=0;
+					if (unlikely(wbufq_add(c, buf, len)<0)){
+						lock_release(&c->write_lock);
+						n=-1;
+						goto error;
+					}
+					n=len;
+					lock_release(&c->write_lock);
+					goto release_c;
+				}
+			lock_release(&c->write_lock);
+		}
+#endif /* TCP_BUF_WRITE */
 		/* check if this is not the same reader process holding
 		/* check if this is not the same reader process holding
 		 *  c  and if so send directly on c->fd */
 		 *  c  and if so send directly on c->fd */
 		if (c->reader_pid==my_pid()){
 		if (c->reader_pid==my_pid()){
@@ -1237,6 +1460,7 @@ get_fd:
 				LOG(L_ERR, "BUG: tcp_send: failed to get fd(receive_fd):"
 				LOG(L_ERR, "BUG: tcp_send: failed to get fd(receive_fd):"
 							" %s (%d)\n", strerror(errno), errno);
 							" %s (%d)\n", strerror(errno), errno);
 				n=-1;
 				n=-1;
+				do_close_fd=0;
 				goto release_c;
 				goto release_c;
 			}
 			}
 			if (unlikely(c!=tmp)){
 			if (unlikely(c!=tmp)){
@@ -1256,6 +1480,21 @@ get_fd:
 send_it:
 send_it:
 	DBG("tcp_send: sending...\n");
 	DBG("tcp_send: sending...\n");
 	lock_get(&c->write_lock);
 	lock_get(&c->write_lock);
+#ifdef TCP_BUF_WRITE
+	if (likely(tcp_options.tcp_buf_write)){
+		if (c->wbuf_q.first){
+			if (unlikely(wbufq_add(c, buf, len)<0)){
+				lock_release(&c->write_lock);
+				n=-1;
+				goto error;
+			}
+			lock_release(&c->write_lock);
+			n=len;
+			goto end;
+		}
+		n=_tcpconn_write_nb(fd, c, buf, len);
+	}else{
+#endif /* TCP_BUF_WRITE */
 #ifdef USE_TLS
 #ifdef USE_TLS
 	if (c->type==PROTO_TLS)
 	if (c->type==PROTO_TLS)
 		n=tls_blocking_write(c, fd, buf, len);
 		n=tls_blocking_write(c, fd, buf, len);
@@ -1263,10 +1502,39 @@ send_it:
 #endif
 #endif
 		/* n=tcp_blocking_write(c, fd, buf, len); */
 		/* n=tcp_blocking_write(c, fd, buf, len); */
 		n=tsend_stream(fd, buf, len, tcp_send_timeout*1000); 
 		n=tsend_stream(fd, buf, len, tcp_send_timeout*1000); 
+#ifdef TCP_BUF_WRITE
+	}
+#endif /* TCP_BUF_WRITE */
 	lock_release(&c->write_lock);
 	lock_release(&c->write_lock);
 	DBG("tcp_send: after write: c= %p n=%d fd=%d\n",c, n, fd);
 	DBG("tcp_send: after write: c= %p n=%d fd=%d\n",c, n, fd);
 	DBG("tcp_send: buf=\n%.*s\n", (int)len, buf);
 	DBG("tcp_send: buf=\n%.*s\n", (int)len, buf);
 	if (unlikely(n<0)){
 	if (unlikely(n<0)){
+#ifdef TCP_BUF_WRITE
+		if (tcp_options.tcp_buf_write && 
+				(errno==EAGAIN || errno==EWOULDBLOCK)){
+			lock_get(&c->write_lock);
+			enable_write_watch=(c->wbuf_q.first==0);
+			if (unlikely(wbufq_add(c, buf, len)<0)){
+				lock_release(&c->write_lock);
+				n=-1;
+				goto error;
+			}
+			lock_release(&c->write_lock);
+			n=len;
+			if (enable_write_watch){
+				response[0]=(long)c;
+				response[1]=CONN_QUEUED_WRITE;
+				if (send_all(unix_tcp_sock, response, sizeof(response))<=0){
+					LOG(L_ERR, "BUG: tcp_send: error return failed "
+							"(write):%s (%d)\n", strerror(errno), errno);
+					n=-1;
+					goto error;
+				}
+			}
+			goto end;
+		}
+error:
+#endif /* TCP_BUF_WRITE */
 		LOG(L_ERR, "ERROR: tcp_send: failed to send\n");
 		LOG(L_ERR, "ERROR: tcp_send: failed to send\n");
 		/* error on the connection , mark it as bad and set 0 timeout */
 		/* error on the connection , mark it as bad and set 0 timeout */
 		c->state=S_CONN_BAD;
 		c->state=S_CONN_BAD;
@@ -1294,6 +1562,13 @@ send_it:
 		if (do_close_fd) close(fd);
 		if (do_close_fd) close(fd);
 		return n; /* error return, no tcpconn_put */
 		return n; /* error return, no tcpconn_put */
 	}
 	}
+#ifdef TCP_BUF_WRITE
+	if (likely(tcp_options.tcp_buf_write)){
+		if (unlikely(c->state==S_CONN_CONNECT))
+			c->state=S_CONN_OK;
+		c->last_write=get_ticks_raw();
+	}
+#endif /* TCP_BUF_WRITE */
 end:
 end:
 #ifdef TCP_FD_CACHE
 #ifdef TCP_FD_CACHE
 	if (unlikely((fd_cache_e==0) && tcp_options.fd_cache)){
 	if (unlikely((fd_cache_e==0) && tcp_options.fd_cache)){
@@ -1465,23 +1740,40 @@ static void tcpconn_destroy(struct tcp_connection* tcpconn)
 	 *  (if the timer is already removed, nothing happens) */
 	 *  (if the timer is already removed, nothing happens) */
 	if (likely(!(tcpconn->flags & F_CONN_READER)))
 	if (likely(!(tcpconn->flags & F_CONN_READER)))
 		local_timer_del(&tcp_main_ltimer, &tcpconn->timer);
 		local_timer_del(&tcp_main_ltimer, &tcpconn->timer);
+#ifdef TCP_BUF_WRITE
+	if (unlikely((tcpconn->flags & F_CONN_WRITE_W) ||
+				!(tcpconn->flags & F_CONN_REMOVED))){
+		LOG(L_CRIT, "tcpconn_destroy: possible BUG: flags = %0x\n",
+					tcpconn->flags);
+	}
+	if (unlikely(tcpconn->wbuf_q.first)){
+		lock_get(&tcpconn->write_lock);
+			/* check again, while holding the lock */
+			if (likely(tcpconn->wbuf_q.first))
+				wbufq_destroy(&tcpconn->wbuf_q);
+		lock_release(&tcpconn->write_lock);
+	}
+#endif /* TCP_BUF_WRITE */
 	TCPCONN_LOCK; /*avoid races w/ tcp_send*/
 	TCPCONN_LOCK; /*avoid races w/ tcp_send*/
 	if (likely(atomic_dec_and_test(&tcpconn->refcnt))){ 
 	if (likely(atomic_dec_and_test(&tcpconn->refcnt))){ 
 		_tcpconn_detach(tcpconn);
 		_tcpconn_detach(tcpconn);
 		TCPCONN_UNLOCK;
 		TCPCONN_UNLOCK;
-		DBG("tcpconn_destroy: destroying connection %p, flags %04x\n",
-				tcpconn, tcpconn->flags);
+		DBG("tcpconn_destroy: destroying connection %p (%d, %d) flags %04x\n",
+				tcpconn, tcpconn->id, tcpconn->s, tcpconn->flags);
 		fd=tcpconn->s;
 		fd=tcpconn->s;
 #ifdef USE_TLS
 #ifdef USE_TLS
 		/*FIXME: lock ->writelock ? */
 		/*FIXME: lock ->writelock ? */
 		if (tcpconn->type==PROTO_TLS)
 		if (tcpconn->type==PROTO_TLS)
 			tls_close(tcpconn, fd);
 			tls_close(tcpconn, fd);
 #endif
 #endif
-		_tcpconn_free(tcpconn);
+		_tcpconn_free(tcpconn); /* destroys also the wbuf_q if still present*/
 #ifdef TCP_FD_CACHE
 #ifdef TCP_FD_CACHE
 		if (likely(tcp_options.fd_cache)) shutdown(fd, SHUT_RDWR);
 		if (likely(tcp_options.fd_cache)) shutdown(fd, SHUT_RDWR);
 #endif /* TCP_FD_CACHE */
 #endif /* TCP_FD_CACHE */
-		close(fd);
+		if (unlikely(close(fd)<0)){
+			LOG(L_ERR, "ERROR: tcpconn_destroy; close() failed: %s (%d)\n",
+					strerror(errno), errno);
+		}
 		(*tcp_connections_no)--;
 		(*tcp_connections_no)--;
 	}else{
 	}else{
 		TCPCONN_UNLOCK;
 		TCPCONN_UNLOCK;
@@ -1627,6 +1919,13 @@ inline static void send_fd_queue_run(struct tcp_send_fd_q* q)
 						   p->unix_sock, (long)(p-&q->data[0]), p->retries,
 						   p->unix_sock, (long)(p-&q->data[0]), p->retries,
 						   p->tcp_conn, p->tcp_conn->s, errno,
 						   p->tcp_conn, p->tcp_conn->s, errno,
 						   strerror(errno));
 						   strerror(errno));
+#ifdef TCP_BUF_WRITE
+				if (p->tcp_conn->flags & F_CONN_WRITE_W){
+					io_watch_del(&io_h, p->tcp_conn->s, -1, IO_FD_CLOSING);
+					p->tcp_conn->flags &=~F_CONN_WRITE_W;
+				}
+#endif
+				p->tcp_conn->flags &= ~F_CONN_READER;
 				tcpconn_destroy(p->tcp_conn);
 				tcpconn_destroy(p->tcp_conn);
 			}
 			}
 		}
 		}
@@ -1638,6 +1937,36 @@ inline static void send_fd_queue_run(struct tcp_send_fd_q* q)
 #endif
 #endif
 
 
 
 
+/* non blocking write() on a tcpconnection, unsafe version (should be called
+ * while holding  c->write_lock). The fd should be non-blocking.
+ *  returns number of bytes written on success, -1 on error (and sets errno)
+ */
+inline static int _tcpconn_write_nb(int fd, struct tcp_connection* c,
+									char* buf, int len)
+{
+	int n;
+	
+again:
+#ifdef USE_TLS
+	if (unlikely(c->type==PROTO_TLS))
+		/* FIXME: tls_nonblocking_write !! */
+		n=tls_blocking_write(c, fd, buf, len);
+	else
+#endif /* USE_TLS */
+		n=send(fd, buf, len,
+#ifdef HAVE_MSG_NOSIGNAL
+					MSG_NOSIGNAL
+#else
+					0
+#endif /* HAVE_MSG_NOSIGNAL */
+			  );
+	if (unlikely(n<0)){
+		if (errno==EINTR) goto again;
+	}
+	return n;
+}
+
+
 
 
 /* handles io from a tcp child process
 /* handles io from a tcp child process
  * params: tcp_c - pointer in the tcp_children array, to the entry for
  * params: tcp_c - pointer in the tcp_children array, to the entry for
@@ -1654,6 +1983,7 @@ inline static int handle_tcp_child(struct tcp_child* tcp_c, int fd_i)
 	long response[2];
 	long response[2];
 	int cmd;
 	int cmd;
 	int bytes;
 	int bytes;
+	int n;
 	ticks_t t;
 	ticks_t t;
 	
 	
 	if (unlikely(tcp_c->unix_sock<=0)){
 	if (unlikely(tcp_c->unix_sock<=0)){
@@ -1715,6 +2045,12 @@ inline static int handle_tcp_child(struct tcp_child* tcp_c, int fd_i)
 		case CONN_RELEASE:
 		case CONN_RELEASE:
 			tcp_c->busy--;
 			tcp_c->busy--;
 			if (unlikely(tcpconn->state==S_CONN_BAD)){ 
 			if (unlikely(tcpconn->state==S_CONN_BAD)){ 
+#ifdef TCP_BUF_WRITE
+				if (unlikely(tcpconn->flags & F_CONN_WRITE_W)){
+					io_watch_del(&io_h, tcpconn->s, -1, IO_FD_CLOSING);
+					tcpconn->flags &= ~F_CONN_WRITE_W;
+				}
+#endif /* TCP_BUF_WRITE */
 				tcpconn_destroy(tcpconn);
 				tcpconn_destroy(tcpconn);
 				break;
 				break;
 			}
 			}
@@ -1729,12 +2065,22 @@ inline static int handle_tcp_child(struct tcp_child* tcp_c, int fd_i)
 								tcp_con_lifetime, t);
 								tcp_con_lifetime, t);
 			/* must be after the de-ref*/
 			/* must be after the de-ref*/
 			tcpconn->flags&=~(F_CONN_REMOVED|F_CONN_READER);
 			tcpconn->flags&=~(F_CONN_REMOVED|F_CONN_READER);
-			if (unlikely(
-					io_watch_add(&io_h, tcpconn->s, POLLIN,
-												F_TCPCONN, tcpconn)<0)){
+#ifdef TCP_BUF_WRITE
+			if (unlikely(tcpconn->flags & F_CONN_WRITE_W))
+				n=io_watch_chg(&io_h, tcpconn->s, POLLIN| POLLOUT, -1);
+			else
+#endif /* TCP_BUF_WRITE */
+				n=io_watch_add(&io_h, tcpconn->s, POLLIN, F_TCPCONN, tcpconn);
+			if (unlikely(n<0)){
 				LOG(L_CRIT, "ERROR: tcp_main: handle_tcp_child: failed to add"
 				LOG(L_CRIT, "ERROR: tcp_main: handle_tcp_child: failed to add"
 						" new socket to the fd list\n");
 						" new socket to the fd list\n");
 				tcpconn->flags|=F_CONN_REMOVED;
 				tcpconn->flags|=F_CONN_REMOVED;
+#ifdef TCP_BUF_WRITE
+				if (unlikely(tcpconn->flags & F_CONN_WRITE_W)){
+					io_watch_del(&io_h, tcpconn->s, -1, IO_FD_CLOSING);
+					tcpconn->flags&=~F_CONN_WRITE_W;
+				}
+#endif /* TCP_BUF_WRITE */
 				tcpconn_destroy(tcpconn); /* closes also the fd */
 				tcpconn_destroy(tcpconn); /* closes also the fd */
 			}
 			}
 			DBG("handle_tcp_child: CONN_RELEASE  %p refcnt= %d\n", 
 			DBG("handle_tcp_child: CONN_RELEASE  %p refcnt= %d\n", 
@@ -1749,6 +2095,12 @@ inline static int handle_tcp_child(struct tcp_child* tcp_c, int fd_i)
 				 if (tcpconn->s!=-1)
 				 if (tcpconn->s!=-1)
 					io_watch_del(&io_h, tcpconn->s, -1, IO_FD_CLOSING);
 					io_watch_del(&io_h, tcpconn->s, -1, IO_FD_CLOSING);
 				*/
 				*/
+#ifdef TCP_BUF_WRITE
+				if ((tcpconn->flags & F_CONN_WRITE_W) && (tcpconn->s!=-1)){
+					io_watch_del(&io_h, tcpconn->s, -1, IO_FD_CLOSING);
+					tcpconn->flags&=~F_CONN_WRITE_W;
+				}
+#endif /* TCP_BUF_WRITE */
 				tcpconn_destroy(tcpconn); /* closes also the fd */
 				tcpconn_destroy(tcpconn); /* closes also the fd */
 				break;
 				break;
 		default:
 		default:
@@ -1785,6 +2137,7 @@ inline static int handle_ser_child(struct process_table* p, int fd_i)
 	int bytes;
 	int bytes;
 	int ret;
 	int ret;
 	int fd;
 	int fd;
+	int flags;
 	ticks_t t;
 	ticks_t t;
 	
 	
 	ret=-1;
 	ret=-1;
@@ -1844,10 +2197,15 @@ inline static int handle_ser_child(struct process_table* p, int fd_i)
 	}
 	}
 	switch(cmd){
 	switch(cmd){
 		case CONN_ERROR:
 		case CONN_ERROR:
-			if (!(tcpconn->flags & F_CONN_REMOVED) && (tcpconn->s!=-1)){
+			if ( (!(tcpconn->flags & F_CONN_REMOVED) ||
+					(tcpconn->flags & F_CONN_WRITE_W) ) && (tcpconn->s!=-1)){
 				io_watch_del(&io_h, tcpconn->s, -1, IO_FD_CLOSING);
 				io_watch_del(&io_h, tcpconn->s, -1, IO_FD_CLOSING);
 				tcpconn->flags|=F_CONN_REMOVED;
 				tcpconn->flags|=F_CONN_REMOVED;
+				tcpconn->flags&=~F_CONN_WRITE_W;
 			}
 			}
+			LOG(L_ERR, "handle_ser_child: ERROR: received CON_ERROR for %p"
+					" (id %d), refcnt %d\n", 
+					tcpconn, tcpconn->id, atomic_get(&tcpconn->refcnt));
 			tcpconn_destroy(tcpconn); /* will close also the fd */
 			tcpconn_destroy(tcpconn); /* will close also the fd */
 			break;
 			break;
 		case CONN_GET_FD:
 		case CONN_GET_FD:
@@ -1879,15 +2237,53 @@ inline static int handle_ser_child(struct process_table* p, int fd_i)
 			local_timer_add(&tcp_main_ltimer, &tcpconn->timer, 
 			local_timer_add(&tcp_main_ltimer, &tcpconn->timer, 
 								tcp_con_lifetime, t);
 								tcp_con_lifetime, t);
 			tcpconn->flags&=~F_CONN_REMOVED;
 			tcpconn->flags&=~F_CONN_REMOVED;
+			flags=POLLIN 
+#ifdef TCP_BUF_WRITE
+					/* not used for now, the connection is sent to tcp_main
+					 * before knowing if we can write on it or we should 
+					 * wait */
+					| (((int)!(tcpconn->flags & F_CONN_WRITE_W)-1) & POLLOUT)
+#endif /* TCP_BUF_WRITE */
+					;
 			if (unlikely(
 			if (unlikely(
-					io_watch_add(&io_h, tcpconn->s, POLLIN,
+					io_watch_add(&io_h, tcpconn->s, flags,
 												F_TCPCONN, tcpconn)<0)){
 												F_TCPCONN, tcpconn)<0)){
 				LOG(L_CRIT, "ERROR: tcp_main: handle_ser_child: failed to add"
 				LOG(L_CRIT, "ERROR: tcp_main: handle_ser_child: failed to add"
 						" new socket to the fd list\n");
 						" new socket to the fd list\n");
 				tcpconn->flags|=F_CONN_REMOVED;
 				tcpconn->flags|=F_CONN_REMOVED;
+				tcpconn->flags&=~F_CONN_WRITE_W;
 				tcpconn_destroy(tcpconn); /* closes also the fd */
 				tcpconn_destroy(tcpconn); /* closes also the fd */
 			}
 			}
 			break;
 			break;
+#ifdef TCP_BUF_WRITE
+		case CONN_QUEUED_WRITE:
+			if (!(tcpconn->flags & F_CONN_WRITE_W)){
+				if (tcpconn->flags& F_CONN_REMOVED){
+					if (unlikely(io_watch_add(&io_h, tcpconn->s, POLLOUT,
+												F_TCPCONN, tcpconn)<0)){
+						LOG(L_CRIT, "ERROR: tcp_main: handle_ser_child: failed"
+								    " to enable write watch on socket\n");
+						tcpconn_destroy(tcpconn);
+						break;
+					}
+				}else{
+					if (unlikely(io_watch_chg(&io_h, tcpconn->s,
+												POLLIN|POLLOUT, -1)<0)){
+						LOG(L_CRIT, "ERROR: tcp_main: handle_ser_child: failed"
+								    " to change socket watch events\n");
+						io_watch_del(&io_h, tcpconn->s, -1, IO_FD_CLOSING);
+						tcpconn->flags|=F_CONN_REMOVED;
+						tcpconn_destroy(tcpconn);
+						break;
+					}
+				}
+				tcpconn->flags|=F_CONN_WRITE_W;
+			}else{
+				LOG(L_WARN, "tcp_main: hanlder_ser_child: connection %p"
+							" already watched for write\n", tcpconn);
+			}
+			break;
+#endif /* TCP_BUF_WRITE */
 		default:
 		default:
 			LOG(L_CRIT, "BUG: handle_ser_child: unknown cmd %d\n", cmd);
 			LOG(L_CRIT, "BUG: handle_ser_child: unknown cmd %d\n", cmd);
 	}
 	}
@@ -2056,6 +2452,7 @@ static inline int handle_new_connect(struct socket_info* si)
 		if(unlikely(send2child(tcpconn)<0)){
 		if(unlikely(send2child(tcpconn)<0)){
 			LOG(L_ERR,"ERROR: handle_new_connect: no children "
 			LOG(L_ERR,"ERROR: handle_new_connect: no children "
 					"available\n");
 					"available\n");
+			tcpconn->flags&=~F_CONN_READER;
 			tcpconn_destroy(tcpconn);
 			tcpconn_destroy(tcpconn);
 		}
 		}
 #endif
 #endif
@@ -2075,13 +2472,17 @@ static inline int handle_new_connect(struct socket_info* si)
  * params: tcpconn - pointer to the tcp_connection for which we have an io ev.
  * params: tcpconn - pointer to the tcp_connection for which we have an io ev.
  *         fd_i    - index in the fd_array table (needed for delete)
  *         fd_i    - index in the fd_array table (needed for delete)
  * returns:  handle_* return convention, but on success it always returns 0
  * returns:  handle_* return convention, but on success it always returns 0
- *           (because it's one-shot, after a succesfull execution the fd is
+ *           (because it's one-shot, after a succesful execution the fd is
  *            removed from tcp_main's watch fd list and passed to a child =>
  *            removed from tcp_main's watch fd list and passed to a child =>
  *            tcp_main is not interested in further io events that might be
  *            tcp_main is not interested in further io events that might be
  *            queued for this fd)
  *            queued for this fd)
  */
  */
-inline static int handle_tcpconn_ev(struct tcp_connection* tcpconn, int fd_i)
+inline static int handle_tcpconn_ev(struct tcp_connection* tcpconn, short ev, 
+										int fd_i)
 {
 {
+#ifdef TCP_BUF_WRITE
+	int empty_q;
+#endif /* TCP_BUF_WRITE */
 	/*  is refcnt!=0 really necessary? 
 	/*  is refcnt!=0 really necessary? 
 	 *  No, in fact it's a bug: I can have the following situation: a send only
 	 *  No, in fact it's a bug: I can have the following situation: a send only
 	 *   tcp connection used by n processes simultaneously => refcnt = n. In 
 	 *   tcp connection used by n processes simultaneously => refcnt = n. In 
@@ -2101,17 +2502,55 @@ inline static int handle_tcpconn_ev(struct tcp_connection* tcpconn, int fd_i)
 #endif
 #endif
 	/* pass it to child, so remove it from the io watch list  and the local
 	/* pass it to child, so remove it from the io watch list  and the local
 	 *  timer */
 	 *  timer */
-	DBG("handle_tcpconn_ev: data available on %p %d\n", tcpconn, tcpconn->s);
-	if (unlikely(io_watch_del(&io_h, tcpconn->s, fd_i, 0)==-1)) goto error;
-	tcpconn->flags|=F_CONN_REMOVED|F_CONN_READER;
-	local_timer_del(&tcp_main_ltimer, &tcpconn->timer);
-	tcpconn_ref(tcpconn); /* refcnt ++ */
-	if (unlikely(send2child(tcpconn)<0)){
-		LOG(L_ERR,"ERROR: handle_tcpconn_ev: no children available\n");
-		tcpconn_destroy(tcpconn);
+	DBG("handle_tcpconn_ev: ev (%0x) on %p %d\n", ev, tcpconn, tcpconn->s);
+#ifdef TCP_BUF_WRITE
+	if (unlikely((ev & POLLOUT) && (tcpconn->flags & F_CONN_WRITE_W))){
+		if (unlikely(wbufq_run(tcpconn->s, tcpconn, &empty_q)<0)){
+			io_watch_del(&io_h, tcpconn->s, fd_i, 0);
+			tcpconn->flags|=F_CONN_REMOVED;
+			tcpconn->flags&=~F_CONN_WRITE_W;
+			tcpconn_destroy(tcpconn);
+			goto error;
+		}
+		if (empty_q){
+			if (tcpconn->flags & F_CONN_REMOVED){
+				if (unlikely(io_watch_del(&io_h, tcpconn->s, fd_i, 0)==-1))
+					goto error;
+			}else{
+				if (unlikely(io_watch_chg(&io_h, tcpconn->s,
+											POLLIN, fd_i)==-1))
+					goto error;
+			}
+		}
+	}
+	if (likely((ev & POLLIN) && !(tcpconn->flags & F_CONN_REMOVED))){
+		if (unlikely(tcpconn->flags & F_CONN_WRITE_W)){
+			if (unlikely(io_watch_chg(&io_h, tcpconn->s, POLLOUT, fd_i)==-1))
+				goto error;
+		}else
+#else
+	{
+#endif /* TCP_BUF_WRITE */
+			if (unlikely(io_watch_del(&io_h, tcpconn->s, fd_i, 0)==-1))
+				goto error;
+		tcpconn->flags|=F_CONN_REMOVED|F_CONN_READER;
+		local_timer_del(&tcp_main_ltimer, &tcpconn->timer);
+		tcpconn_ref(tcpconn); /* refcnt ++ */
+		if (unlikely(send2child(tcpconn)<0)){
+			LOG(L_ERR,"ERROR: handle_tcpconn_ev: no children available\n");
+			tcpconn->flags&=~F_CONN_READER;
+#ifdef TCP_BUF_WRITE
+			if (tcpconn->flags & F_CONN_WRITE_W){
+				io_watch_del(&io_h, tcpconn->s, fd_i, 0);
+				tcpconn->flags&=~F_CONN_WRITE_W;
+			}
+#endif /* TCP_BUF_WRITE */
+			tcpconn_destroy(tcpconn);
+		}
 	}
 	}
 	return 0; /* we are not interested in possibly queued io events, 
 	return 0; /* we are not interested in possibly queued io events, 
-				 the fd was either passed to a child, or closed */
+				 the fd was either passed to a child, closed, or for writes,
+				 everything possible was already written */
 error:
 error:
 	return -1;
 	return -1;
 }
 }
@@ -2131,7 +2570,7 @@ error:
  *         >0 on successfull read from the fd (when there might be more io
  *         >0 on successfull read from the fd (when there might be more io
  *            queued -- the receive buffer might still be non-empty)
  *            queued -- the receive buffer might still be non-empty)
  */
  */
-inline static int handle_io(struct fd_map* fm, short events, int idx)
+inline static int handle_io(struct fd_map* fm, short ev, int idx)
 {	
 {	
 	int ret;
 	int ret;
 	
 	
@@ -2140,7 +2579,7 @@ inline static int handle_io(struct fd_map* fm, short events, int idx)
 			ret=handle_new_connect((struct socket_info*)fm->data);
 			ret=handle_new_connect((struct socket_info*)fm->data);
 			break;
 			break;
 		case F_TCPCONN:
 		case F_TCPCONN:
-			ret=handle_tcpconn_ev((struct tcp_connection*)fm->data, idx);
+			ret=handle_tcpconn_ev((struct tcp_connection*)fm->data, ev, idx);
 			break;
 			break;
 		case F_TCPCHILD:
 		case F_TCPCHILD:
 			ret=handle_tcp_child((struct tcp_child*)fm->data, idx);
 			ret=handle_tcp_child((struct tcp_child*)fm->data, idx);
@@ -2185,9 +2624,16 @@ static ticks_t tcpconn_main_timeout(ticks_t t, struct timer_ln* tl, void* data)
 				TCPCONN_UNLOCK; /* unlock as soon as possible */
 				TCPCONN_UNLOCK; /* unlock as soon as possible */
 				fd=c->s;
 				fd=c->s;
 				if (likely(fd>0)){
 				if (likely(fd>0)){
-					if (likely(!(c->flags & F_CONN_REMOVED))){
+					if (likely(!(c->flags & F_CONN_REMOVED)
+#ifdef TCP_BUF_WRITE
+								|| (c->flags & F_CONN_WRITE_W)
+#endif /* TCP_BUF_WRITE */
+								)){
 						io_watch_del(&io_h, fd, -1, IO_FD_CLOSING);
 						io_watch_del(&io_h, fd, -1, IO_FD_CLOSING);
 						c->flags|=F_CONN_REMOVED;
 						c->flags|=F_CONN_REMOVED;
+#ifdef TCP_BUF_WRITE
+						c->flags&=~F_CONN_WRITE_W;
+#endif /* TCP_BUF_WRITE */
 					}
 					}
 #ifdef USE_TLS
 #ifdef USE_TLS
 					if (unlikely(c->type==PROTO_TLS ))
 					if (unlikely(c->type==PROTO_TLS ))
@@ -2250,9 +2696,16 @@ static inline void tcpconn_destroy_all()
 						local_timer_del(&tcp_main_ltimer, &c->timer);
 						local_timer_del(&tcp_main_ltimer, &c->timer);
 					/* else still in some reader */
 					/* else still in some reader */
 					fd=c->s;
 					fd=c->s;
-					if (fd>0 && !(c->flags & F_CONN_REMOVED)){
+					if (fd>0 && (!(c->flags & F_CONN_REMOVED)
+#ifdef TCP_BUF_WRITE
+								|| (c->flags & F_CONN_WRITE_W)
+#endif /* TCP_BUF_WRITE */
+								)){
 						io_watch_del(&io_h, fd, -1, IO_FD_CLOSING);
 						io_watch_del(&io_h, fd, -1, IO_FD_CLOSING);
 						c->flags|=F_CONN_REMOVED;
 						c->flags|=F_CONN_REMOVED;
+#ifdef TCP_BUF_WRITE
+						c->flags&=~F_CONN_WRITE_W;
+#endif /* TCP_BUF_WRITE */
 					}
 					}
 				}else{
 				}else{
 					fd=-1;
 					fd=-1;
@@ -2456,6 +2909,12 @@ void destroy_tcp()
 			shm_free(tcp_connections_no);
 			shm_free(tcp_connections_no);
 			tcp_connections_no=0;
 			tcp_connections_no=0;
 		}
 		}
+#ifdef TCP_BUF_WRITE
+		if (tcp_total_wq){
+			shm_free(tcp_total_wq);
+			tcp_total_wq=0;
+		}
+#endif /* TCP_BUF_WRITE */
 		if (connection_id){
 		if (connection_id){
 			shm_free(connection_id);
 			shm_free(connection_id);
 			connection_id=0;
 			connection_id=0;
@@ -2508,6 +2967,13 @@ int init_tcp()
 		goto error;
 		goto error;
 	}
 	}
 	*connection_id=1;
 	*connection_id=1;
+#ifdef TCP_BUF_WRITE
+	tcp_total_wq=shm_malloc(sizeof(*tcp_total_wq));
+	if (tcp_total_wq==0){
+		LOG(L_CRIT, "ERROR: init_tcp: could not alloc globals\n");
+		goto error;
+	}
+#endif /* TCP_BUF_WRITE */
 	/* alloc hashtables*/
 	/* alloc hashtables*/
 	tcpconn_aliases_hash=(struct tcp_conn_alias**)
 	tcpconn_aliases_hash=(struct tcp_conn_alias**)
 			shm_malloc(TCP_ALIAS_HASH_SIZE* sizeof(struct tcp_conn_alias*));
 			shm_malloc(TCP_ALIAS_HASH_SIZE* sizeof(struct tcp_conn_alias*));
@@ -2675,6 +3141,11 @@ void tcp_get_info(struct tcp_gen_info *ti)
 	ti->tcp_readers=tcp_children_no;
 	ti->tcp_readers=tcp_children_no;
 	ti->tcp_max_connections=tcp_max_connections;
 	ti->tcp_max_connections=tcp_max_connections;
 	ti->tcp_connections_no=*tcp_connections_no;
 	ti->tcp_connections_no=*tcp_connections_no;
+#ifdef TCP_BUF_WRITE
+	ti->tcp_write_queued=*tcp_total_wq;
+#else
+	ti->tcp_write_queued=0;
+#endif /* TCP_BUF_WRITE */
 }
 }
 
 
 #endif
 #endif

+ 17 - 3
tcp_options.c

@@ -25,6 +25,8 @@
 
 
 #include "tcp_options.h"
 #include "tcp_options.h"
 #include "dprint.h"
 #include "dprint.h"
+#include "globals.h"
+#include "timer_ticks.h"
 
 
 
 
 struct tcp_cfg_options tcp_options;
 struct tcp_cfg_options tcp_options;
@@ -33,7 +35,12 @@ struct tcp_cfg_options tcp_options;
 /* set defaults */
 /* set defaults */
 void init_tcp_options()
 void init_tcp_options()
 {
 {
-
+#ifdef TCP_BUF_WRITE
+	tcp_options.tcp_buf_write=0;
+	tcp_options.tcpconn_wq_max=32*1024; /* 32 k */
+	tcp_options.tcp_wq_max=10*1024*1024; /* 10 MB */
+	tcp_options.tcp_wq_timeout=S_TO_TICKS(tcp_send_timeout);
+#endif
 #ifdef TCP_FD_CACHE
 #ifdef TCP_FD_CACHE
 	tcp_options.fd_cache=1;
 	tcp_options.fd_cache=1;
 #endif
 #endif
@@ -54,7 +61,7 @@ void init_tcp_options()
 
 
 #define W_OPT_NC(option) \
 #define W_OPT_NC(option) \
 	if (tcp_options.option){\
 	if (tcp_options.option){\
-		WARN("tcp_options: tcp_" ##option \
+		WARN("tcp_options: tcp_" #option \
 				"cannot be enabled (recompile needed)\n"); \
 				"cannot be enabled (recompile needed)\n"); \
 		tcp_options.option=0; \
 		tcp_options.option=0; \
 	}
 	}
@@ -63,7 +70,7 @@ void init_tcp_options()
 
 
 #define W_OPT_NS(option) \
 #define W_OPT_NS(option) \
 	if (tcp_options.option){\
 	if (tcp_options.option){\
-		WARN("tcp_options: tcp_" ##option \
+		WARN("tcp_options: tcp_" #option \
 				"cannot be enabled (no OS support)\n"); \
 				"cannot be enabled (no OS support)\n"); \
 		tcp_options.option=0; \
 		tcp_options.option=0; \
 	}
 	}
@@ -76,6 +83,13 @@ void tcp_options_check()
 	W_OPT_NC(defer_accept);
 	W_OPT_NC(defer_accept);
 #endif
 #endif
 
 
+#ifndef TCP_BUF_WRITE
+	W_OPT_NC(tcp_buf_write);
+	W_OPT_NC(tcpconn_wq_max);
+	W_OPT_NC(tcp_wq_max);
+	W_OPT_NC(tcp_wq_timeout);
+#endif /* TCP_BUF_WRITE */
+
 #if ! defined HAVE_TCP_DEFER_ACCEPT && ! defined HAVE_TCP_ACCEPT_FILTER
 #if ! defined HAVE_TCP_DEFER_ACCEPT && ! defined HAVE_TCP_ACCEPT_FILTER
 	W_OPT_NS(defer_accept);
 	W_OPT_NS(defer_accept);
 #endif
 #endif

+ 11 - 0
tcp_options.h

@@ -26,6 +26,11 @@
 #ifndef tcp_options_h
 #ifndef tcp_options_h
 #define tcp_options_h
 #define tcp_options_h
 
 
+
+#ifndef NO_TCP_BUF_WRITE
+#define TCP_BUF_WRITE /* enabled buffered writing */
+#endif 
+
 #ifndef NO_TCP_FD_CACHE
 #ifndef NO_TCP_FD_CACHE
 #define TCP_FD_CACHE /* enable fd caching */
 #define TCP_FD_CACHE /* enable fd caching */
 #endif
 #endif
@@ -95,6 +100,12 @@
 struct tcp_cfg_options{
 struct tcp_cfg_options{
 	/* ser tcp options */
 	/* ser tcp options */
 	int fd_cache; /* on /off */
 	int fd_cache; /* on /off */
+	/* tcp buf. write options */
+	int tcp_buf_write; /* on / off */
+	unsigned int tcpconn_wq_max; /* maximum queue len per connection */
+	unsigned int tcp_wq_max; /* maximum overall queued bytes */
+	unsigned int tcp_wq_timeout;      /* timeout for queue writes */
+
 	/* tcp socket options */
 	/* tcp socket options */
 	int defer_accept; /* on / off */
 	int defer_accept; /* on / off */
 	int delayed_ack; /* delay ack on connect */ 
 	int delayed_ack; /* delay ack on connect */