浏览代码

tcp: new tls hooks interface and async tls changes

- new tls hooks interface that better accommodates tls async use.
  Changed read() (takes an extra flags parameter now), removed
  blocking_write() and fix_read_con(), added do_send() and
  fst_send() (both of them handle snd_flags now and might return a
  command that should be sent to tcp_main).
- more tcp send functions (tcpconn_1st_send(),
  tcpconn_send_unsafe()) and more send functions exported
  (tls_int_send.h) for use from the tls module.
- split tcp_read() into tcp_read() and tcp_read_data() and
  exported tcp_read_data() (tcp_read.h).
- support for repeating a tcp_read() if indicated
  (RD_CONN_REPEAT_READ), needed for tls.
Andrei Pelinescu-Onciul 15 年之前
父节点
当前提交
ce51fbb84e
共有 5 个文件被更改,包括 579 次插入343 次删除
  1. 50 0
      tcp_int_send.h
  2. 374 277
      tcp_main.c
  3. 94 50
      tcp_read.c
  4. 46 0
      tcp_read.h
  5. 15 16
      tls_hooks.h

+ 50 - 0
tcp_int_send.h

@@ -0,0 +1,50 @@
+/* 
+ * $Id$
+ * 
+ * Copyright (C) 2010 iptelorg GmbH
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+/** internal tcp send functions (use with care).
+ * @file tcp_int_send.h
+ */
+/*
+ * History:
+ * --------
+ *  2010-03-23  initial version (andrei)
+*/
+
+#ifndef __tcp_int_send_h
+#define __tcp_int_send_h
+
+#include "tcp_conn.h"
+
+int tcpconn_do_send(int fd, struct tcp_connection* c,
+							char* buf, unsigned len,
+							snd_flags_t send_flags, long* resp, int locked);
+
+int tcpconn_1st_send(int fd, struct tcp_connection* c,
+							char* buf, unsigned len,
+							snd_flags_t send_flags, long* resp, int locked);
+
+int tcpconn_send_unsafe(int fd, struct tcp_connection *c,
+						char* buf, unsigned len, snd_flags_t send_flags);
+
+/* direct non-blocking, unsafe (assumes locked) send on a tcp connection */
+int _tcpconn_write_nb(int fd, struct tcp_connection* c,
+									char* buf, int len);
+
+
+#endif /*__tcp_int_send_h*/
+
+/* vi: set ts=4 sw=4 tw=79:ai:cindent: */

+ 374 - 277
tcp_main.c

@@ -103,13 +103,13 @@
  *  2009-04-09  tcp ev and tcp stats macros added (andrei)
  *  2009-04-09  tcp ev and tcp stats macros added (andrei)
  *  2009-09-15  support for force connection reuse and close after send
  *  2009-09-15  support for force connection reuse and close after send
  *               send flags (andrei)
  *               send flags (andrei)
+ *  2010-03-23  tcp_send() split in 3 smaller functions (andrei)
  */
  */
 
 
-/*!
- * \file
- * \brief SIP-router core :: 
- * \ingroup core
- * Module: \ref core
+/** tcp main/dispatcher and tcp send functions.
+ * @file tcp_main.c
+ * @ingroup core
+ * Module: @ref core
  */
  */
 
 
 
 
@@ -165,6 +165,7 @@
 #include "sr_module.h"
 #include "sr_module.h"
 #include "tcp_server.h"
 #include "tcp_server.h"
 #include "tcp_init.h"
 #include "tcp_init.h"
+#include "tcp_int_send.h"
 #include "tcp_stats.h"
 #include "tcp_stats.h"
 #include "tcp_ev.h"
 #include "tcp_ev.h"
 #include "tsend.h"
 #include "tsend.h"
@@ -631,10 +632,6 @@ end:
 
 
 
 
 
 
-inline static int _tcpconn_write_nb(int fd, struct tcp_connection* c,
-									char* buf, int len);
-
-
 #ifdef TCP_ASYNC
 #ifdef TCP_ASYNC
 
 
 
 
@@ -1717,10 +1714,6 @@ inline static void tcp_fd_cache_add(struct tcp_connection *c, int fd)
 
 
 inline static int tcpconn_chld_put(struct tcp_connection* tcpconn);
 inline static int tcpconn_chld_put(struct tcp_connection* tcpconn);
 
 
-static int tcpconn_do_send(int fd, struct tcp_connection* c,
-							char* buf, unsigned len,
-							snd_flags_t send_flags, long* resp);
-
 static int tcpconn_send_put(struct tcp_connection* c, char* buf, unsigned len,
 static int tcpconn_send_put(struct tcp_connection* c, char* buf, unsigned len,
 							snd_flags_t send_flags);
 							snd_flags_t send_flags);
 
 
@@ -1766,257 +1759,201 @@ int tcp_send(struct dest_info* dst, union sockaddr_union* from,
 			}
 			}
 		}
 		}
 	}
 	}
-/* no_id: */
-		if (unlikely((c==0) || tcpconn_close_after_send(c))){
-			if (unlikely(c)){
-				/* can't use c if it's marked as close-after-send  =>
-				   release it and try opening new one */
-				tcpconn_chld_put(c); /* release c (dec refcnt & free on 0) */
-				c=0;
-			}
-			/* check if connect() is disabled */
-			if (unlikely((dst->send_flags.f & SND_F_FORCE_CON_REUSE) ||
-							cfg_get(tcp, tcp_cfg, no_connect)))
-				return -1;
-			DBG("tcp_send: no open tcp connection found, opening new one\n");
-			/* create tcp connection */
-			if (likely(from==0)){
-				/* check to see if we have to use a specific source addr. */
-				switch (dst->to.s.sa_family) {
-					case AF_INET:
-							from = tcp_source_ipv4;
-						break;
+	/* connection not found or unusable => open a new one and send on it */
+	if (unlikely((c==0) || tcpconn_close_after_send(c))){
+		if (unlikely(c)){
+			/* can't use c if it's marked as close-after-send  =>
+			   release it and try opening new one */
+			tcpconn_chld_put(c); /* release c (dec refcnt & free on 0) */
+			c=0;
+		}
+		/* check if connect() is disabled */
+		if (unlikely((dst->send_flags.f & SND_F_FORCE_CON_REUSE) ||
+						cfg_get(tcp, tcp_cfg, no_connect)))
+			return -1;
+		DBG("tcp_send: no open tcp connection found, opening new one\n");
+		/* create tcp connection */
+		if (likely(from==0)){
+			/* check to see if we have to use a specific source addr. */
+			switch (dst->to.s.sa_family) {
+				case AF_INET:
+						from = tcp_source_ipv4;
+					break;
 #ifdef USE_IPV6
 #ifdef USE_IPV6
-					case AF_INET6:
-							from = tcp_source_ipv6;
-						break;
+				case AF_INET6:
+						from = tcp_source_ipv6;
+					break;
 #endif
 #endif
-					default:
-						/* error, bad af, ignore ... */
-						break;
-				}
+				default:
+					/* error, bad af, ignore ... */
+					break;
 			}
 			}
+		}
 #if defined(TCP_CONNECT_WAIT) && defined(TCP_ASYNC)
 #if defined(TCP_CONNECT_WAIT) && defined(TCP_ASYNC)
-			if (likely(cfg_get(tcp, tcp_cfg, tcp_connect_wait) && 
-						cfg_get(tcp, tcp_cfg, async) )){
-				if (unlikely(*tcp_connections_no >=
-								cfg_get(tcp, tcp_cfg, max_connections))){
-					LOG(L_ERR, "ERROR: tcp_send %s: maximum number of"
-								" connections exceeded (%d/%d)\n",
-								su2a(&dst->to, sizeof(dst->to)),
-								*tcp_connections_no,
-								cfg_get(tcp, tcp_cfg, max_connections));
-					return -1;
-				}
-				c=tcpconn_new(-1, &dst->to, from, 0, dst->proto,
-								S_CONN_CONNECT);
-				if (unlikely(c==0)){
-					LOG(L_ERR, "ERROR: tcp_send %s: could not create new"
-							" connection\n",
-							su2a(&dst->to, sizeof(dst->to)));
-					return -1;
-				}
-				c->flags|=F_CONN_PENDING|F_CONN_FD_CLOSED;
-				tcpconn_set_send_flags(c, dst->send_flags);
-				atomic_set(&c->refcnt, 2); /* ref from here and from main hash
-											 table */
-				/* add it to id hash and aliases */
-				if (unlikely(tcpconn_add(c)==0)){
-					LOG(L_ERR, "ERROR: tcp_send %s: could not add "
-								"connection %p\n",
-								su2a(&dst->to, sizeof(dst->to)),
-									c);
-					_tcpconn_free(c);
-					n=-1;
-					goto end_no_conn;
-				}
-				/* do connect and if src ip or port changed, update the 
-				 * aliases */
-				if (unlikely((fd=tcpconn_finish_connect(c, from))<0)){
-					/* tcpconn_finish_connect will automatically blacklist
-					   on error => no need to do it here */
-					LOG(L_ERR, "ERROR: tcp_send %s: tcpconn_finish_connect(%p)"
-							" failed\n", su2a(&dst->to, sizeof(dst->to)),
-								c);
-					goto conn_wait_error;
-				}
-				/* ? TODO: it might be faster just to queue the write directly
-				 *  and send to main CONN_NEW_PENDING_WRITE */
-				/* delay sending the fd to main after the send */
-				
-				/* NOTE: no lock here, because the connection is marked as
-				 * pending and nobody else will try to write on it. However
-				 * this might produce out-of-order writes. If this is not
-				 * desired either lock before the write or use 
-				 * _wbufq_insert(...) */
-				n=_tcpconn_write_nb(fd, c, buf, len);
-				if (unlikely(n<(int)len)){
-					if ((n>=0) || errno==EAGAIN || errno==EWOULDBLOCK){
-						DBG("tcp_send: pending write on new connection %p "
-								" (%d/%d bytes written)\n", c, n, len);
-						if (n<0) n=0;
-						else{
-							TCP_STATS_ESTABLISHED(S_CONN_CONNECT);
-							c->state=S_CONN_OK; /* partial write => connect()
-													ended */
-						}
-						/* add to the write queue */
-						lock_get(&c->write_lock);
-							if (unlikely(_wbufq_insert(c, buf+n, len-n)<0)){
-								lock_release(&c->write_lock);
-								n=-1;
-								LOG(L_ERR, "ERROR: tcp_send %s: EAGAIN and"
-										" write queue full or failed for %p\n",
-										su2a(&dst->to, sizeof(dst->to)),
-										c);
-								goto conn_wait_error;
-							}
-						lock_release(&c->write_lock);
-						/* send to tcp_main */
-						response[0]=(long)c;
-						response[1]=CONN_NEW_PENDING_WRITE;
-						if (unlikely(send_fd(unix_tcp_sock, response, 
-												sizeof(response), fd) <= 0)){
-							LOG(L_ERR, "BUG: tcp_send %s: "
-										"CONN_NEW_PENDING_WRITE  for %p"
-										" failed:" " %s (%d)\n",
-										su2a(&dst->to, sizeof(dst->to)),
-										c, strerror(errno), errno);
-							goto conn_wait_error;
-						}
-						n=len;
-						goto conn_wait_success;
-					}
-					/* if first write failed it's most likely a
-					   connect error */
-					switch(errno){
-						case ENETUNREACH:
-						case EHOSTUNREACH:  /* not posix for send() */
-#ifdef USE_DST_BLACKLIST
-							dst_blacklist_add( BLST_ERR_CONNECT, dst, 0);
-#endif /* USE_DST_BLACKLIST */
-							TCP_EV_CONNECT_UNREACHABLE(errno, TCP_LADDR(c),
-									TCP_LPORT(c), TCP_PSU(c), TCP_PROTO(c));
-							break;
-						case ECONNREFUSED:
-						case ECONNRESET:
-#ifdef USE_DST_BLACKLIST
-							dst_blacklist_add( BLST_ERR_CONNECT, dst, 0);
-#endif /* USE_DST_BLACKLIST */
-							TCP_EV_CONNECT_RST(errno, TCP_LADDR(c),
-									TCP_LPORT(c), TCP_PSU(c), TCP_PROTO(c));
-							break;
-						default:
-							TCP_EV_CONNECT_ERR(errno, TCP_LADDR(c),
-									TCP_LPORT(c), TCP_PSU(c), TCP_PROTO(c));
-					}
-					/* error: destroy it directly */
-					TCP_STATS_CONNECT_FAILED();
-					LOG(L_ERR, "ERROR: tcp_send %s: connect & send "
-										" for %p failed:" " %s (%d)\n",
-										su2a(&dst->to, sizeof(dst->to)),
-										c, strerror(errno), errno);
-					goto conn_wait_error;
-				}
-				LOG(L_INFO, "tcp_send: quick connect for %p\n", c);
-				TCP_STATS_ESTABLISHED(S_CONN_CONNECT);
-				if (unlikely(dst->send_flags.f & SND_F_CON_CLOSE)){
-					/* if close-after-send requested, don't bother
-					   sending the fd back to tcp_main, try closing it
-					   immediately (no other tcp_send should use it,
-					   because it is marked as close-after-send before
-					   being added to the hash */
-					goto conn_wait_close;
-				}
-				c->state=S_CONN_OK;
-				/* send to tcp_main */
-				response[0]=(long)c;
-				response[1]=CONN_NEW_COMPLETE;
-				if (unlikely(send_fd(unix_tcp_sock, response, 
-										sizeof(response), fd) <= 0)){
-					LOG(L_ERR, "BUG: tcp_send %s: CONN_NEW_COMPLETE  for %p"
-								" failed:" " %s (%d)\n",
-								su2a(&dst->to, sizeof(dst->to)),
-								c, strerror(errno), errno);
-					goto conn_wait_error;
-				}
-				goto conn_wait_success;
+		if (likely(cfg_get(tcp, tcp_cfg, tcp_connect_wait) && 
+					cfg_get(tcp, tcp_cfg, async) )){
+			if (unlikely(*tcp_connections_no >=
+							cfg_get(tcp, tcp_cfg, max_connections))){
+				LOG(L_ERR, "ERROR: tcp_send %s: maximum number of"
+							" connections exceeded (%d/%d)\n",
+							su2a(&dst->to, sizeof(dst->to)),
+							*tcp_connections_no,
+							cfg_get(tcp, tcp_cfg, max_connections));
+				return -1;
 			}
 			}
-#endif /* TCP_CONNECT_WAIT  && TCP_ASYNC */
-			if (unlikely((c=tcpconn_connect(&dst->to, from, dst->proto,
-											&dst->send_flags))==0)){
-				LOG(L_ERR, "ERROR: tcp_send %s: connect failed\n",
-								su2a(&dst->to, sizeof(dst->to)));
+			c=tcpconn_new(-1, &dst->to, from, 0, dst->proto,
+							S_CONN_CONNECT);
+			if (unlikely(c==0)){
+				LOG(L_ERR, "ERROR: tcp_send %s: could not create new"
+						" connection\n",
+						su2a(&dst->to, sizeof(dst->to)));
 				return -1;
 				return -1;
 			}
 			}
+			c->flags|=F_CONN_PENDING|F_CONN_FD_CLOSED;
 			tcpconn_set_send_flags(c, dst->send_flags);
 			tcpconn_set_send_flags(c, dst->send_flags);
-			if (likely(c->state==S_CONN_OK))
-				TCP_STATS_ESTABLISHED(S_CONN_CONNECT);
-			atomic_set(&c->refcnt, 2); /* ref. from here and it will also
-			                              be added in the tcp_main hash */
-			fd=c->s;
-			c->flags|=F_CONN_FD_CLOSED; /* not yet opened in main */
-			/* ? TODO: it might be faster just to queue the write and
-			 * send to main a CONN_NEW_PENDING_WRITE */
-			
-			/* send the new tcpconn to "tcp main" */
-			response[0]=(long)c;
-			response[1]=CONN_NEW;
-			n=send_fd(unix_tcp_sock, response, sizeof(response), c->s);
-			if (unlikely(n<=0)){
-				LOG(L_ERR, "BUG: tcp_send %s: failed send_fd: %s (%d)\n",
-						su2a(&dst->to, sizeof(dst->to)),
-						strerror(errno), errno);
-				/* we can safely delete it, it's not referenced by anybody */
+			atomic_set(&c->refcnt, 2); /* ref from here and from main hash
+										 table */
+			/* add it to id hash and aliases */
+			if (unlikely(tcpconn_add(c)==0)){
+				LOG(L_ERR, "ERROR: tcp_send %s: could not add "
+							"connection %p\n",
+							su2a(&dst->to, sizeof(dst->to)),
+								c);
 				_tcpconn_free(c);
 				_tcpconn_free(c);
 				n=-1;
 				n=-1;
 				goto end_no_conn;
 				goto end_no_conn;
 			}
 			}
-			/* new connection => send on it directly */
-			n = tcpconn_do_send(fd, c, buf, len, dst->send_flags,
+			/* do connect and if src ip or port changed, update the 
+			 * aliases */
+			if (unlikely((fd=tcpconn_finish_connect(c, from))<0)){
+				/* tcpconn_finish_connect will automatically blacklist
+				   on error => no need to do it here */
+				LOG(L_ERR, "ERROR: tcp_send %s: tcpconn_finish_connect(%p)"
+						" failed\n", su2a(&dst->to, sizeof(dst->to)),
+							c);
+				goto conn_wait_error;
+			}
+			/* ? TODO: it might be faster just to queue the write directly
+			 *  and send to main CONN_NEW_PENDING_WRITE */
+			/* delay sending the fd to main after the send */
+			
+			/* NOTE: no lock here, because the connection is marked as
+			 * pending and nobody else will try to write on it. However
+			 * this might produce out-of-order writes. If this is not
+			 * desired either lock before the write or use 
+			 * _wbufq_insert(...) */
+#ifdef USE_TLS
+			if (unlikely(c->type==PROTO_TLS))
+				n=tls_1st_send(fd, c, buf, len, dst->send_flags,
 									&response[1]);
 									&response[1]);
-			if (unlikely(response[1] != CONN_NOP)) {
-				response[0]=(long)c;
-				if (send_all(unix_tcp_sock, response, sizeof(response)) <= 0) {
-					BUG("tcp_main command %ld sending failed (write):"
-							"%s (%d)\n", response[1], strerror(errno), errno);
-					/* all commands != CONN_NOP returned by tcpconn_do_send()
-					   (CONN_EOF, CONN_ERROR, CONN_QUEUED_WRITE) will auto-dec
-					   refcnt => if sending the command fails we have to
-					   dec. refcnt by hand */
-					tcpconn_chld_put(c); /* deref. it manually */
-					n=-1;
-				}
-				/* here refcnt for c is already decremented => c contents can
-				   no longer be used and refcnt _must_ _not_ be decremented
-				   again on exit */
-				if (unlikely(n < 0 || response[1] == CONN_EOF)) {
-					/* on error or eof, close fd */
-					close(fd);
-				} else if (response[1] == CONN_QUEUED_WRITE) {
+			else
+#endif /* USE_TLS */
+				n=tcpconn_1st_send(fd, c, buf, len, dst->send_flags,
+									&response[1], 0);
+			if (unlikely(n<0))
+				goto conn_wait_error;
+			if (unlikely(response[1]==CONN_EOF)){
+				/* if close-after-send requested, don't bother
+				   sending the fd back to tcp_main, try closing it
+				   immediately (no other tcp_send should use it,
+				   because it is marked as close-after-send before
+				   being added to the hash */
+				goto conn_wait_close;
+			}
+			/* send to tcp_main */
+			response[0]=(long)c;
+			if (unlikely(response[1]!=CONN_NOP &&
+						(send_fd(unix_tcp_sock, response,
+									sizeof(response), fd) <= 0))){
+				LOG(L_ERR, "BUG: tcp_send %s: %ld for %p"
+							" failed:" " %s (%d)\n",
+							su2a(&dst->to, sizeof(dst->to)),
+							response[1], c, strerror(errno), errno);
+				goto conn_wait_error;
+			}
+			goto conn_wait_success;
+		}
+#endif /* TCP_CONNECT_WAIT  && TCP_ASYNC */
+		if (unlikely((c=tcpconn_connect(&dst->to, from, dst->proto,
+										&dst->send_flags))==0)){
+			LOG(L_ERR, "ERROR: tcp_send %s: connect failed\n",
+							su2a(&dst->to, sizeof(dst->to)));
+			return -1;
+		}
+		tcpconn_set_send_flags(c, dst->send_flags);
+		if (likely(c->state==S_CONN_OK))
+			TCP_STATS_ESTABLISHED(S_CONN_CONNECT);
+		atomic_set(&c->refcnt, 2); /* ref. from here and it will also
+									  be added in the tcp_main hash */
+		fd=c->s;
+		c->flags|=F_CONN_FD_CLOSED; /* not yet opened in main */
+		/* ? TODO: it might be faster just to queue the write and
+		 * send to main a CONN_NEW_PENDING_WRITE */
+		
+		/* send the new tcpconn to "tcp main" */
+		response[0]=(long)c;
+		response[1]=CONN_NEW;
+		n=send_fd(unix_tcp_sock, response, sizeof(response), c->s);
+		if (unlikely(n<=0)){
+			LOG(L_ERR, "BUG: tcp_send %s: failed send_fd: %s (%d)\n",
+					su2a(&dst->to, sizeof(dst->to)),
+					strerror(errno), errno);
+			/* we can safely delete it, it's not referenced by anybody */
+			_tcpconn_free(c);
+			n=-1;
+			goto end_no_conn;
+		}
+		/* new connection => send on it directly */
+#ifdef USE_TLS
+		if (unlikely(c->type==PROTO_TLS)) {
+			response[1] = CONN_ERROR; /* in case tls is not loaded */
+			n = tls_do_send(fd, c, buf, len, dst->send_flags,
+							&response[1]);
+		} else
+#endif /* USE_TLS */
+			n = tcpconn_do_send(fd, c, buf, len, dst->send_flags,
+									&response[1], 0);
+		if (unlikely(response[1] != CONN_NOP)) {
+			response[0]=(long)c;
+			if (send_all(unix_tcp_sock, response, sizeof(response)) <= 0) {
+				BUG("tcp_main command %ld sending failed (write):"
+						"%s (%d)\n", response[1], strerror(errno), errno);
+				/* all commands != CONN_NOP returned by tcpconn_do_send()
+				   (CONN_EOF, CONN_ERROR, CONN_QUEUED_WRITE) will auto-dec
+				   refcnt => if sending the command fails we have to
+				   dec. refcnt by hand */
+				tcpconn_chld_put(c); /* deref. it manually */
+				n=-1;
+			}
+			/* here refcnt for c is already decremented => c contents can
+			   no longer be used and refcnt _must_ _not_ be decremented
+			   again on exit */
+			if (unlikely(n < 0 || response[1] == CONN_EOF)) {
+				/* on error or eof, close fd */
+				close(fd);
+			} else if (response[1] == CONN_QUEUED_WRITE) {
 #ifdef TCP_FD_CACHE
 #ifdef TCP_FD_CACHE
-					if (cfg_get(tcp, tcp_cfg, fd_cache)) {
-						tcp_fd_cache_add(c, fd);
-					} else
+				if (cfg_get(tcp, tcp_cfg, fd_cache)) {
+					tcp_fd_cache_add(c, fd);
+				} else
 #endif /* TCP_FD_CACHE */
 #endif /* TCP_FD_CACHE */
-						close(fd);
-				} else {
-					BUG("unexpected tcpconn_do_send() return & response:"
-							" %d, %ld\n", n, response[1]);
-				}
-				goto end_no_deref;
+					close(fd);
+			} else {
+				BUG("unexpected tcpconn_do_send() return & response:"
+						" %d, %ld\n", n, response[1]);
 			}
 			}
+			goto end_no_deref;
+		}
 #ifdef TCP_FD_CACHE
 #ifdef TCP_FD_CACHE
-			if (cfg_get(tcp, tcp_cfg, fd_cache)) {
-				tcp_fd_cache_add(c, fd);
-			}else
+		if (cfg_get(tcp, tcp_cfg, fd_cache)) {
+			tcp_fd_cache_add(c, fd);
+		}else
 #endif /* TCP_FD_CACHE */
 #endif /* TCP_FD_CACHE */
-				close(fd);
-		/* here we can have only commands that _do_ _not_ dec refcnt.
-		   (CONN_EOF, CON_ERROR, CON_QUEUED_WRITE are all treated above) */
-			goto release_c;
-		}
-/* get_fd: */
+			close(fd);
+	/* here we can have only commands that _do_ _not_ dec refcnt.
+	   (CONN_EOF, CON_ERROR, CON_QUEUED_WRITE are all treated above) */
+		goto release_c;
+	} /* if (c==0 or unusable) new connection */
 	/* existing connection, send on it */
 	/* existing connection, send on it */
 	n = tcpconn_send_put(c, buf, len, dst->send_flags);
 	n = tcpconn_send_put(c, buf, len, dst->send_flags);
 	/* no deref needed (automatically done inside tcpconn_send_put() */
 	/* no deref needed (automatically done inside tcpconn_send_put() */
@@ -2067,7 +2004,7 @@ end_no_conn:
 
 
 
 
 
 
-/** sends on an existing tcpconn.
+/** sends on an existing tcpconn and auto-dec. con. ref counter.
  * As opposed to tcp_send(), this function requires an existing
  * As opposed to tcp_send(), this function requires an existing
  * tcp connection.
  * tcp connection.
  * WARNING: the tcp_connection will be de-referenced.
  * WARNING: the tcp_connection will be de-referenced.
@@ -2183,7 +2120,13 @@ static int tcpconn_send_put(struct tcp_connection* c, char* buf, unsigned len,
 			DBG("tcp_send: after receive_fd: c= %p n=%d fd=%d\n",c, n, fd);
 			DBG("tcp_send: after receive_fd: c= %p n=%d fd=%d\n",c, n, fd);
 		}
 		}
 	
 	
-	n = tcpconn_do_send(fd, c, buf, len, send_flags, &response[1]);
+#ifdef USE_TLS
+		if (unlikely(c->type==PROTO_TLS)) {
+			response[1] = CONN_ERROR; /* in case tls is not loaded */
+			n = tls_do_send(fd, c, buf, len, send_flags, &response[1]);
+		} else
+#endif
+			n = tcpconn_do_send(fd, c, buf, len, send_flags, &response[1], 0);
 	if (unlikely(response[1] != CONN_NOP)) {
 	if (unlikely(response[1] != CONN_NOP)) {
 error:
 error:
 		response[0]=(long)c;
 		response[0]=(long)c;
@@ -2239,9 +2182,57 @@ release_c:
 
 
 
 
 
 
+/* unsafe send on a known tcp connection.
+ * Directly send on a known tcp connection with a given fd.
+ * It is assumed that the connection locks are already held.
+ * Side effects: if needed it will send state update commands to
+ *  tcp_main (e.g. CON_EOF, CON_ERROR, CON_QUEUED_WRITE).
+ * @param fd - fd used for sending.
+ * @param c - existing tcp connection pointer (state and flags might be
+ *            changed).
+ * @param buf - data to be sent.
+ * @param len - data length.
+ * @param send_flags
+ * @return <0 on error, number of bytes sent on success.
+ */
+int tcpconn_send_unsafe(int fd, struct tcp_connection *c,
+						char* buf, unsigned len, snd_flags_t send_flags)
+{
+	int n;
+	long response[2];
+	
+	n = tcpconn_do_send(fd, c, buf, len, send_flags, &response[1], 1);
+	if (unlikely(response[1] != CONN_NOP)) {
+		/* all commands != CONN_NOP returned by tcpconn_do_send()
+		   (CONN_EOF, CONN_ERROR, CONN_QUEUED_WRITE) will auto-dec refcnt
+		   => increment it (we don't want the connection to be destroyed
+		   from under us)
+		 */
+		atomic_inc(&c->refcnt);
+		response[0]=(long)c;
+		if (send_all(unix_tcp_sock, response, sizeof(response)) <= 0) {
+			BUG("connection %p command %ld sending failed (write):%s (%d)\n",
+					c, response[1], strerror(errno), errno);
+			/* send failed => deref. it back by hand */
+			tcpconn_chld_put(c); 
+			n=-1;
+		}
+		/* here refcnt for c is already decremented => c contents can no
+		   longer be used and refcnt _must_ _not_ be decremented again
+		   on exit */
+		return n;
+	}
+	return n;
+}
+
+
+
 /** lower level send (connection and fd should be known).
 /** lower level send (connection and fd should be known).
  * It takes care of possible write-queueing, blacklisting a.s.o.
  * It takes care of possible write-queueing, blacklisting a.s.o.
  * It expects a valid tcp connection. It doesn't touch the ref. cnts.
  * It expects a valid tcp connection. It doesn't touch the ref. cnts.
+ * It will also set the connection flags from send_flags (it's better
+ * to do it here, because it's guaranteed to be under lock).
+ * @param fd - fd used for sending.
  * @param c - existing tcp connection pointer (state and flags might be
  * @param c - existing tcp connection pointer (state and flags might be
  *            changed).
  *            changed).
  * @param buf - data to be sent.
  * @param buf - data to be sent.
@@ -2255,12 +2246,15 @@ release_c:
  *                      CONN_QUEUED_WRITE - new write queue (connection
  *                      CONN_QUEUED_WRITE - new write queue (connection
  *                                 should be watched for write and the wr.
  *                                 should be watched for write and the wr.
  *                                 queue flushed).
  *                                 queue flushed).
+ * @param locked - if set assume the connection is already locked (call from
+ *                  tls) and do not lock/unlock the connection.
  * @return >=0 on success, < 0 on error && *resp == CON_ERROR.
  * @return >=0 on success, < 0 on error && *resp == CON_ERROR.
  *
  *
  */
  */
-static int tcpconn_do_send(int fd, struct tcp_connection* c,
+int tcpconn_do_send(int fd, struct tcp_connection* c,
 							char* buf, unsigned len,
 							char* buf, unsigned len,
-							snd_flags_t send_flags, long* resp)
+							snd_flags_t send_flags, long* resp,
+							int locked)
 {
 {
 	int  n;
 	int  n;
 #ifdef TCP_ASYNC
 #ifdef TCP_ASYNC
@@ -2269,7 +2263,7 @@ static int tcpconn_do_send(int fd, struct tcp_connection* c,
 
 
 	DBG("tcp_send: sending...\n");
 	DBG("tcp_send: sending...\n");
 	*resp = CONN_NOP;
 	*resp = CONN_NOP;
-	lock_get(&c->write_lock);
+	if (likely(!locked)) lock_get(&c->write_lock);
 	/* update connection send flags with the current ones */
 	/* update connection send flags with the current ones */
 	tcpconn_set_send_flags(c, send_flags);
 	tcpconn_set_send_flags(c, send_flags);
 #ifdef TCP_ASYNC
 #ifdef TCP_ASYNC
@@ -2280,37 +2274,32 @@ static int tcpconn_do_send(int fd, struct tcp_connection* c,
 #endif /* TCP_CONNECT_WAIT */
 #endif /* TCP_CONNECT_WAIT */
 			){
 			){
 			if (unlikely(_wbufq_add(c, buf, len)<0)){
 			if (unlikely(_wbufq_add(c, buf, len)<0)){
-				lock_release(&c->write_lock);
+				if (likely(!locked)) lock_release(&c->write_lock);
 				n=-1;
 				n=-1;
 				goto error;
 				goto error;
 			}
 			}
-			lock_release(&c->write_lock);
+			if (likely(!locked)) lock_release(&c->write_lock);
 			n=len;
 			n=len;
 			goto end;
 			goto end;
 		}
 		}
 		n=_tcpconn_write_nb(fd, c, buf, len);
 		n=_tcpconn_write_nb(fd, c, buf, len);
 	}else{
 	}else{
 #endif /* TCP_ASYNC */
 #endif /* TCP_ASYNC */
-#ifdef USE_TLS
-		if (c->type==PROTO_TLS)
-			n=tls_blocking_write(c, fd, buf, len);
-		else
-#endif
 		/* n=tcp_blocking_write(c, fd, buf, len); */
 		/* n=tcp_blocking_write(c, fd, buf, len); */
-			n=tsend_stream(fd, buf, len,
-							TICKS_TO_S(cfg_get(tcp, tcp_cfg, send_timeout)) *
-							1000);
+		n=tsend_stream(fd, buf, len,
+						TICKS_TO_S(cfg_get(tcp, tcp_cfg, send_timeout)) *
+						1000);
 #ifdef TCP_ASYNC
 #ifdef TCP_ASYNC
 	}
 	}
 #else /* ! TCP_ASYNC */
 #else /* ! TCP_ASYNC */
-	lock_release(&c->write_lock);
+	if (likely(!locked)) lock_release(&c->write_lock);
 #endif /* TCP_ASYNC */
 #endif /* TCP_ASYNC */
 	
 	
 	DBG("tcp_send: after real write: c= %p n=%d fd=%d\n",c, n, fd);
 	DBG("tcp_send: after real write: c= %p n=%d fd=%d\n",c, n, fd);
 	DBG("tcp_send: buf=\n%.*s\n", (int)len, buf);
 	DBG("tcp_send: buf=\n%.*s\n", (int)len, buf);
 	if (unlikely(n<(int)len)){
 	if (unlikely(n<(int)len)){
 #ifdef TCP_ASYNC
 #ifdef TCP_ASYNC
-		if (cfg_get(tcp, tcp_cfg, async) && 
+		if (cfg_get(tcp, tcp_cfg, async) &&
 				((n>=0) || errno==EAGAIN || errno==EWOULDBLOCK)){
 				((n>=0) || errno==EAGAIN || errno==EWOULDBLOCK)){
 			enable_write_watch=_wbufq_empty(c);
 			enable_write_watch=_wbufq_empty(c);
 			if (n<0) n=0;
 			if (n<0) n=0;
@@ -2320,17 +2309,17 @@ static int tcpconn_do_send(int fd, struct tcp_connection* c,
 				c->state=S_CONN_OK; /* something was written */
 				c->state=S_CONN_OK; /* something was written */
 			}
 			}
 			if (unlikely(_wbufq_add(c, buf+n, len-n)<0)){
 			if (unlikely(_wbufq_add(c, buf+n, len-n)<0)){
-				lock_release(&c->write_lock);
+				if (likely(!locked)) lock_release(&c->write_lock);
 				n=-1;
 				n=-1;
 				goto error;
 				goto error;
 			}
 			}
-			lock_release(&c->write_lock);
+			if (likely(!locked)) lock_release(&c->write_lock);
 			n=len;
 			n=len;
 			if (likely(enable_write_watch))
 			if (likely(enable_write_watch))
 				*resp=CONN_QUEUED_WRITE;
 				*resp=CONN_QUEUED_WRITE;
 			goto end;
 			goto end;
 		}else{
 		}else{
-			lock_release(&c->write_lock);
+			if (likely(!locked)) lock_release(&c->write_lock);
 		}
 		}
 #endif /* TCP_ASYNC */
 #endif /* TCP_ASYNC */
 		if (unlikely(c->state==S_CONN_CONNECT)){
 		if (unlikely(c->state==S_CONN_CONNECT)){
@@ -2390,7 +2379,7 @@ error:
 	}
 	}
 	
 	
 #ifdef TCP_ASYNC
 #ifdef TCP_ASYNC
-	lock_release(&c->write_lock);
+	if (likely(!locked)) lock_release(&c->write_lock);
 #endif /* TCP_ASYNC */
 #endif /* TCP_ASYNC */
 	/* in non-async mode here we're either in S_CONN_OK or S_CONN_ACCEPT*/
 	/* in non-async mode here we're either in S_CONN_OK or S_CONN_ACCEPT*/
 	if (unlikely(c->state==S_CONN_CONNECT || c->state==S_CONN_ACCEPT)){
 	if (unlikely(c->state==S_CONN_CONNECT || c->state==S_CONN_ACCEPT)){
@@ -2411,6 +2400,121 @@ end:
 
 
 
 
 
 
+/** low level 1st send on a new connection.
+ * It takes care of possible write-queueing, blacklisting a.s.o.
+ * It expects a valid just-opened tcp connection. It doesn't touch the 
+ * ref. counters. It's used only in the async first send case.
+ * @param fd - fd used for sending.
+ * @param c - existing tcp connection pointer (state and flags might be
+ *            changed). The connection must be new (no previous send on it).
+ * @param buf - data to be sent.
+ * @param len - data length.
+ * @param send_flags
+ * @param resp - filled with a fd sending cmd. for tcp_main on success:
+ *                      CONN_NOP - nothing needs to be done (unused right now).
+ *                      CONN_NEW_PENDING_WRITE - new connection, first write
+ *                                 was partially successful (or EAGAIN) and
+ *                                 was queued (connection should be watched
+ *                                 for write and the write queue flushed).
+ *                                 The fd should be sent to tcp_main.
+ *                      CONN_NEW_COMPLETE - new connection, first write
+ *                                 completed successfuly and no data is queued.
+ *                                 The fd should be sent to tcp_main.
+ *                      CONN_EOF - no error, but the connection should be
+ *                                  closed (e.g. SND_F_CON_CLOSE send flag).
+ * @param locked - if set assume the connection is already locked (call from
+ *                  tls) and do not lock/unlock the connection.
+ * @return >=0 on success, < 0 on error (on error *resp is undefined).
+ *
+ */
+int tcpconn_1st_send(int fd, struct tcp_connection* c,
+							char* buf, unsigned len,
+							snd_flags_t send_flags, long* resp,
+							int locked)
+{
+	int n;
+	
+	n=_tcpconn_write_nb(fd, c, buf, len);
+	if (unlikely(n<(int)len)){
+		if ((n>=0) || errno==EAGAIN || errno==EWOULDBLOCK){
+			DBG("pending write on new connection %p "
+				" (%d/%d bytes written)\n", c, n, len);
+			if (unlikely(n<0)) n=0;
+			else{
+				TCP_STATS_ESTABLISHED(S_CONN_CONNECT);
+				c->state=S_CONN_OK; /* partial write => connect()
+												ended */
+			}
+			/* add to the write queue */
+			if (likely(!locked)) lock_get(&c->write_lock);
+				if (unlikely(_wbufq_insert(c, buf+n, len-n)<0)){
+					if (likely(!locked)) lock_release(&c->write_lock);
+					n=-1;
+					LOG(L_ERR, "%s: EAGAIN and"
+							" write queue full or failed for %p\n",
+							su2a(&c->rcv.src_su, sizeof(c->rcv.src_su)), c);
+					goto error;
+				}
+			if (likely(!locked)) lock_release(&c->write_lock);
+			/* send to tcp_main */
+			*resp=CONN_NEW_PENDING_WRITE;
+			n=len;
+			goto end;
+		}
+		/* n < 0 and not EAGAIN => write error */
+		/* if first write failed it's most likely a
+		   connect error */
+		switch(errno){
+			case ENETUNREACH:
+			case EHOSTUNREACH:  /* not posix for send() */
+#ifdef USE_DST_BLACKLIST
+				dst_blacklist_su( BLST_ERR_CONNECT, c->rcv.proto,
+									&c->rcv.src_su, &c->send_flags, 0);
+#endif /* USE_DST_BLACKLIST */
+				TCP_EV_CONNECT_UNREACHABLE(errno, TCP_LADDR(c),
+								TCP_LPORT(c), TCP_PSU(c), TCP_PROTO(c));
+				break;
+			case ECONNREFUSED:
+			case ECONNRESET:
+#ifdef USE_DST_BLACKLIST
+				dst_blacklist_su( BLST_ERR_CONNECT, c->rcv.proto,
+									&c->rcv.src_su, &c->send_flags, 0);
+#endif /* USE_DST_BLACKLIST */
+				TCP_EV_CONNECT_RST(errno, TCP_LADDR(c),
+								TCP_LPORT(c), TCP_PSU(c), TCP_PROTO(c));
+				break;
+			default:
+				TCP_EV_CONNECT_ERR(errno, TCP_LADDR(c),
+								TCP_LPORT(c), TCP_PSU(c), TCP_PROTO(c));
+		}
+		/* error: destroy it directly */
+		TCP_STATS_CONNECT_FAILED();
+		LOG(L_ERR, "%s: connect & send  for %p failed:" " %s (%d)\n",
+					su2a(&c->rcv.src_su, sizeof(c->rcv.src_su)),
+					c, strerror(errno), errno);
+		goto error;
+	}
+	LOG(L_INFO, "quick connect for %p\n", c);
+	TCP_STATS_ESTABLISHED(S_CONN_CONNECT);
+	if (unlikely(send_flags.f & SND_F_CON_CLOSE)){
+		/* close after write =>  EOF => close immediately */
+		c->state=S_CONN_BAD;
+		/* tell our caller that it should drop this*/
+		*resp=CONN_EOF;
+	}else{
+		c->state=S_CONN_OK;
+		/* send to tcp_main */
+		*resp=CONN_NEW_COMPLETE;
+	}
+end:
+	return n; /* >= 0 */
+error:
+	*resp=CONN_ERROR;
+	return -1;
+}
+
+
+
 int tcp_init(struct socket_info* sock_info)
 int tcp_init(struct socket_info* sock_info)
 {
 {
 	union sockaddr_union* addr;
 	union sockaddr_union* addr;
@@ -2562,7 +2666,6 @@ inline static void tcpconn_close_main_fd(struct tcp_connection* tcpconn)
 	
 	
 	fd=tcpconn->s;
 	fd=tcpconn->s;
 #ifdef USE_TLS
 #ifdef USE_TLS
-	/*FIXME: lock ->writelock ? */
 	if (tcpconn->type==PROTO_TLS)
 	if (tcpconn->type==PROTO_TLS)
 		tls_close(tcpconn, fd);
 		tls_close(tcpconn, fd);
 #endif
 #endif
@@ -2609,7 +2712,7 @@ inline static int tcpconn_chld_put(struct tcp_connection* tcpconn)
 
 
 
 
 /* simple destroy function (the connection should be already removed
 /* simple destroy function (the connection should be already removed
- * from the hashes and the fds should not be watched anymore for IO)
+ * from the hashes. refcnt 0 and the fds should not be watched anymore for IO)
  */
  */
 inline static void tcpconn_destroy(struct tcp_connection* tcpconn)
 inline static void tcpconn_destroy(struct tcp_connection* tcpconn)
 {
 {
@@ -2879,19 +2982,13 @@ inline static void send_fd_queue_run(struct tcp_send_fd_q* q)
  * while holding  c->write_lock). The fd should be non-blocking.
  * while holding  c->write_lock). The fd should be non-blocking.
  *  returns number of bytes written on success, -1 on error (and sets errno)
  *  returns number of bytes written on success, -1 on error (and sets errno)
  */
  */
-inline static int _tcpconn_write_nb(int fd, struct tcp_connection* c,
+int _tcpconn_write_nb(int fd, struct tcp_connection* c,
 									char* buf, int len)
 									char* buf, int len)
 {
 {
 	int n;
 	int n;
 	
 	
 again:
 again:
-#ifdef USE_TLS
-	if (unlikely(c->type==PROTO_TLS))
-		/* FIXME: tls_nonblocking_write !! */
-		n=tls_blocking_write(c, fd, buf, len);
-	else
-#endif /* USE_TLS */
-		n=send(fd, buf, len,
+	n=send(fd, buf, len,
 #ifdef HAVE_MSG_NOSIGNAL
 #ifdef HAVE_MSG_NOSIGNAL
 					MSG_NOSIGNAL
 					MSG_NOSIGNAL
 #else
 #else

+ 94 - 50
tcp_read.c

@@ -42,13 +42,14 @@
  *              reads (sock. buffer empty) (andrei)
  *              reads (sock. buffer empty) (andrei)
  * 2009-02-26  direct blacklist support (andrei)
  * 2009-02-26  direct blacklist support (andrei)
  * 2009-04-09  tcp ev and tcp stats macros added (andrei)
  * 2009-04-09  tcp ev and tcp stats macros added (andrei)
+ * 2010-05-14  split tcp_read() into tcp_read() and tcp_read_data() (andrei)
+ * 2010-05-17  new RD_CONN_REPEAT_READ flag, used by the tls hooks (andrei)
  */
  */
 
 
-/*!
- * \file
- * \brief SIP-router core :: 
- * \ingroup core
- * Module: \ref core
+/** tcp readers processes, tcp read and pre-parse msg. functions.
+ * @file tcp_read.c
+ * @ingroup core
+ * Module: @ref core
  */
  */
 
 
 #ifdef USE_TCP
 #ifdef USE_TCP
@@ -69,6 +70,7 @@
 
 
 #include "dprint.h"
 #include "dprint.h"
 #include "tcp_conn.h"
 #include "tcp_conn.h"
+#include "tcp_read.h"
 #include "tcp_stats.h"
 #include "tcp_stats.h"
 #include "tcp_ev.h"
 #include "tcp_ev.h"
 #include "pass_fd.h"
 #include "pass_fd.h"
@@ -103,10 +105,6 @@ int is_msg_complete(struct tcp_req* r);
 
 
 #define TCPCONN_TIMEOUT_MIN_RUN  1 /* run the timers each new tick */
 #define TCPCONN_TIMEOUT_MIN_RUN  1 /* run the timers each new tick */
 
 
-#define RD_CONN_SHORT_READ	1
-#define RD_CONN_EOF		2
-#define RD_CONN_FORCE_EOF	65536
-
 /* types used in io_wait* */
 /* types used in io_wait* */
 enum fd_types { F_NONE, F_TCPMAIN, F_TCPCONN };
 enum fd_types { F_NONE, F_TCPMAIN, F_TCPCONN };
 
 
@@ -119,38 +117,43 @@ static struct local_timer tcp_reader_ltimer;
 static ticks_t tcp_reader_prev_ticks;
 static ticks_t tcp_reader_prev_ticks;
 
 
 
 
-/* reads next available bytes
- *   c- tcp connection used for reading, tcp_read changes also c->state on
- *      EOF and c->req.error on read error
- *   * flags - value/result - used to signal a seen or "forced" EOF on the 
+/** reads data from an existing tcp connection.
+ * Side-effects: blacklisting, sets connection state to S_CONN_OK, tcp stats.
+ * @param fd - connection file descriptor
+ * @param c - tcp connection structure. c->state might be changed and
+ *             receive info might be used for blacklisting.
+ * @param buf - buffer where the received data will be stored.
+ * @param b_size - buffer size.
+ * @param flags - value/result - used to signal a seen or "forced" EOF on the
  *     connection (when it is known that no more data will come after the 
  *     connection (when it is known that no more data will come after the 
- *     current socket buffer is emptied )=> return/signal EOF on the first 
+ *     current socket buffer is emptied )=> return/signal EOF on the first
  *     short read (=> don't use it on POLLPRI, as OOB data will cause short
  *     short read (=> don't use it on POLLPRI, as OOB data will cause short
- *      reads even if there are still remaining bytes in the socket buffer)
- * return number of bytes read, 0 on EOF or -1 on error,
+ *     reads even if there are still remaining bytes in the socket buffer)
+ *     input: RD_CONN_FORCE_EOF  - force EOF after the first successful read
+ *                                 (bytes_read >=0 )
+ *     output: RD_CONN_SHORT_READ - if the read exhausted all the bytes
+ *                                  in the socket read buffer.
+ *             RD_CONN_EOF - if EOF detected (0 bytes read) or forced via
+ *                           RD_CONN_FORCE_EOF.
+ *             RD_CONN_REPEAT_READ - the read should be repeated immediately
+ *                                   (used only by the tls code for now).
+ *     Note: RD_CONN_SHORT_READ & RD_CONN_EOF must be cleared
+ *           before calling this function.
+ * @return number of bytes read, 0 on EOF or -1 on error,
  * on EOF it also sets c->state to S_CONN_EOF.
  * on EOF it also sets c->state to S_CONN_EOF.
  * (to distinguish from reads that would block which could return 0)
  * (to distinguish from reads that would block which could return 0)
  * RD_CONN_SHORT_READ is also set in *flags for short reads.
  * RD_CONN_SHORT_READ is also set in *flags for short reads.
- * sets also r->error */
-int tcp_read(struct tcp_connection *c, int* flags)
+ * EOF checking should be done by checking the RD_CONN_EOF flag.
+ */
+int tcp_read_data(int fd, struct tcp_connection *c,
+					char* buf, int b_size, int* flags)
 {
 {
-	int bytes_free, bytes_read;
-	struct tcp_req *r;
-	int fd;
-
-	r=&c->req;
-	fd=c->fd;
-	bytes_free=r->b_size- (int)(r->pos - r->buf);
+	int bytes_read;
 	
 	
-	if (bytes_free==0){
-		LOG(L_ERR, "ERROR: tcp_read: buffer overrun, dropping\n");
-		r->error=TCP_REQ_OVERRUN;
-		return -1;
-	}
 again:
 again:
-	bytes_read=read(fd, r->pos, bytes_free);
-
-	if (likely(bytes_read!=bytes_free)){
+	bytes_read=read(fd, buf, b_size);
+	
+	if (likely(bytes_read!=b_size)){
 		if(unlikely(bytes_read==-1)){
 		if(unlikely(bytes_read==-1)){
 			if (errno == EWOULDBLOCK || errno == EAGAIN){
 			if (errno == EWOULDBLOCK || errno == EAGAIN){
 				bytes_read=0; /* nothing has been read */
 				bytes_read=0; /* nothing has been read */
@@ -194,16 +197,14 @@ again:
 								break;
 								break;
 						}
 						}
 				}
 				}
-				LOG(L_ERR, "ERROR: tcp_read: error reading: %s (%d)\n",
-							strerror(errno), errno);
-				r->error=TCP_READ_ERROR;
+				LOG(L_ERR, "error reading: %s (%d)\n", strerror(errno), errno);
 				return -1;
 				return -1;
 			}
 			}
 		}else if (unlikely((bytes_read==0) || 
 		}else if (unlikely((bytes_read==0) || 
 					(*flags & RD_CONN_FORCE_EOF))){
 					(*flags & RD_CONN_FORCE_EOF))){
 			c->state=S_CONN_EOF;
 			c->state=S_CONN_EOF;
 			*flags|=RD_CONN_EOF;
 			*flags|=RD_CONN_EOF;
-			DBG("tcp_read: EOF on %p, FD %d\n", c, fd);
+			DBG("EOF on %p, FD %d\n", c, fd);
 		}else{
 		}else{
 			if (unlikely(c->state==S_CONN_CONNECT || c->state==S_CONN_ACCEPT)){
 			if (unlikely(c->state==S_CONN_CONNECT || c->state==S_CONN_ACCEPT)){
 				TCP_STATS_ESTABLISHED(c->state);
 				TCP_STATS_ESTABLISHED(c->state);
@@ -218,6 +219,44 @@ again:
 			c->state=S_CONN_OK;
 			c->state=S_CONN_OK;
 		}
 		}
 	}
 	}
+	return bytes_read;
+}
+
+
+
+/* reads next available bytes
+ *   c- tcp connection used for reading, tcp_read changes also c->state on
+ *      EOF and c->req.error on read error
+ *   * flags - value/result - used to signal a seen or "forced" EOF on the 
+ *     connection (when it is known that no more data will come after the 
+ *     current socket buffer is emptied )=> return/signal EOF on the first 
+ *     short read (=> don't use it on POLLPRI, as OOB data will cause short
+ *      reads even if there are still remaining bytes in the socket buffer)
+ * return number of bytes read, 0 on EOF or -1 on error,
+ * on EOF it also sets c->state to S_CONN_EOF.
+ * (to distinguish from reads that would block which could return 0)
+ * RD_CONN_SHORT_READ is also set in *flags for short reads.
+ * sets also r->error */
+int tcp_read(struct tcp_connection *c, int* flags)
+{
+	int bytes_free, bytes_read;
+	struct tcp_req *r;
+	int fd;
+
+	r=&c->req;
+	fd=c->fd;
+	bytes_free=r->b_size- (int)(r->pos - r->buf);
+	
+	if (unlikely(bytes_free==0)){
+		LOG(L_ERR, "ERROR: tcp_read: buffer overrun, dropping\n");
+		r->error=TCP_REQ_OVERRUN;
+		return -1;
+	}
+	bytes_read = tcp_read_data(fd, c, r->pos, bytes_free, flags);
+	if (unlikely(bytes_read < 0)){
+		r->error=TCP_READ_ERROR;
+		return -1;
+	}
 #ifdef EXTRA_DEBUG
 #ifdef EXTRA_DEBUG
 	DBG("tcp_read: read %d bytes:\n%.*s\n", bytes_read, bytes_read, r->pos);
 	DBG("tcp_read: read %d bytes:\n%.*s\n", bytes_read, bytes_read, r->pos);
 #endif
 #endif
@@ -295,7 +334,7 @@ int tcp_read_headers(struct tcp_connection *c, int* read_flags)
 	}else{
 	}else{
 #ifdef USE_TLS
 #ifdef USE_TLS
 		if (unlikely(c->type==PROTO_TLS))
 		if (unlikely(c->type==PROTO_TLS))
-			bytes=tls_read(c); /* FIXME: read_flags support */
+			bytes=tls_read(c, read_flags);
 		else
 		else
 #endif
 #endif
 			bytes=tcp_read(c, read_flags);
 			bytes=tcp_read(c, read_flags);
@@ -653,16 +692,6 @@ int tcp_read_req(struct tcp_connection* con, int* bytes_read, int* read_flags)
 		resp=CONN_RELEASE;
 		resp=CONN_RELEASE;
 		s=con->fd;
 		s=con->fd;
 		req=&con->req;
 		req=&con->req;
-#ifdef USE_TLS
-		if (con->type==PROTO_TLS){
-			ret=tls_fix_read_conn(con);
-			if (unlikely(ret<0)){
-				resp=CONN_ERROR;
-				goto end_req;
-			}else if (unlikely(ret==0))
-				goto end_req; /* not enough data */
-		}
-#endif
 
 
 again:
 again:
 		if (likely(req->error==TCP_REQ_OK)){
 		if (likely(req->error==TCP_REQ_OK)){
@@ -926,6 +955,9 @@ again:
 							con, con->id, atomic_get(&con->refcnt));
 							con, con->id, atomic_get(&con->refcnt));
 				goto con_error;
 				goto con_error;
 			}
 			}
+#ifdef USE_TLS
+repeat_1st_read:
+#endif /* USE_TLS */
 			/* if we received the fd there is most likely data waiting to
 			/* if we received the fd there is most likely data waiting to
 			 * be read => process it first to avoid extra sys calls */
 			 * be read => process it first to avoid extra sys calls */
 			read_flags=((con->flags & (F_CONN_EOF_SEEN|F_CONN_FORCE_EOF)) &&
 			read_flags=((con->flags & (F_CONN_EOF_SEEN|F_CONN_FORCE_EOF)) &&
@@ -940,6 +972,11 @@ again:
 				release_tcpconn(con, resp, tcpmain_sock);
 				release_tcpconn(con, resp, tcpmain_sock);
 				break;
 				break;
 			}
 			}
+#ifdef USE_TLS
+			/* repeat read if requested (for now only tls might do this) */
+			if (unlikely(read_flags & RD_CONN_REPEAT_READ))
+				goto repeat_1st_read;
+#endif /* USE_TLS */
 			
 			
 			/* must be before io_watch_add, io_watch_add might catch some
 			/* must be before io_watch_add, io_watch_add might catch some
 			 * already existing events => might call handle_io and
 			 * already existing events => might call handle_io and
@@ -974,8 +1011,11 @@ again:
 							con, con->id, atomic_get(&con->refcnt));
 							con, con->id, atomic_get(&con->refcnt));
 				goto read_error;
 				goto read_error;
 			}
 			}
+#ifdef USE_TLS
+repeat_read:
+#endif /* USE_TLS */
 #ifdef POLLRDHUP
 #ifdef POLLRDHUP
-			read_flags=(((events & POLLRDHUP) | 
+			read_flags=(((events & POLLRDHUP) |
 							(con->flags & (F_CONN_EOF_SEEN|F_CONN_FORCE_EOF)))
 							(con->flags & (F_CONN_EOF_SEEN|F_CONN_FORCE_EOF)))
 						&& !(events & POLLPRI))? RD_CONN_FORCE_EOF: 0;
 						&& !(events & POLLPRI))? RD_CONN_FORCE_EOF: 0;
 #else /* POLLRDHUP */
 #else /* POLLRDHUP */
@@ -999,11 +1039,15 @@ read_error:
 					con->state=S_CONN_BAD;
 					con->state=S_CONN_BAD;
 				release_tcpconn(con, resp, tcpmain_sock);
 				release_tcpconn(con, resp, tcpmain_sock);
 			}else{
 			}else{
+#ifdef USE_TLS
+				if (unlikely(read_flags & RD_CONN_REPEAT_READ))
+						goto repeat_read;
+#endif /* USE_TLS */
 				/* update timeout */
 				/* update timeout */
 				con->timeout=get_ticks_raw()+S_TO_TICKS(TCP_CHILD_TIMEOUT);
 				con->timeout=get_ticks_raw()+S_TO_TICKS(TCP_CHILD_TIMEOUT);
 				/* ret= 0 (read the whole socket buffer) if short read & 
 				/* ret= 0 (read the whole socket buffer) if short read & 
 				 *  !POLLPRI,  bytes read otherwise */
 				 *  !POLLPRI,  bytes read otherwise */
-				ret&=(((read_flags & RD_CONN_SHORT_READ) && 
+				ret&=(((read_flags & RD_CONN_SHORT_READ) &&
 						!(events & POLLPRI)) - 1);
 						!(events & POLLPRI)) - 1);
 			}
 			}
 			break;
 			break;

+ 46 - 0
tcp_read.h

@@ -0,0 +1,46 @@
+/* 
+ * $Id$
+ * 
+ * Copyright (C) 2010 iptelorg GmbH
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+/** tcp internal read functions.
+ * @file tcp_read.h
+ * @ingroup: core
+ * Module: @ref core
+ */
+/*
+ * History:
+ * --------
+ *  2010-05-18  initial version (andrei)
+*/
+
+#ifndef __tcp_read_h
+#define __tcp_read_h
+
+#include  "tcp_conn.h"
+
+#define RD_CONN_SHORT_READ		1
+#define RD_CONN_EOF				2
+#define RD_CONN_REPEAT_READ		4 /* read should be repeated (more data)
+								   (used so far only by tls) */
+#define RD_CONN_FORCE_EOF		65536
+
+int tcp_read_data(int fd, struct tcp_connection *c,
+					char* buf, int b_size, int* flags);
+
+
+#endif /*__tcp_read_h*/
+
+/* vi: set ts=4 sw=4 tw=79:ai:cindent: */

+ 15 - 16
tls_hooks.h

@@ -21,6 +21,7 @@
  * History:
  * History:
  * --------
  * --------
  *  2007-02-09  created by andrei
  *  2007-02-09  created by andrei
+ *  2010-05-14  new hook interface (better suited for async. tcp) (andrei)
  */
  */
 
 
 /**
 /**
@@ -50,21 +51,18 @@
 
 
 
 
 struct tls_hooks{
 struct tls_hooks{
-	int  (*read)(struct tcp_connection* c);
-	int (*blocking_write)(struct tcp_connection* c, int fd, const char* buf,
-							unsigned int len);
+	int  (*read)(struct tcp_connection* c, int* flags);
+	/* send using tls on a tcp connection */
+	int (*do_send)(int fd, struct tcp_connection* c, const char* buf,
+							unsigned int len, snd_flags_t send_flags,
+							long* resp);
+	/* 1st send using tls on a new async. tcp connection */
+	int (*fst_send)(int fd, struct tcp_connection* c, const char* buf,
+							unsigned int len, snd_flags_t send_flags,
+							long* resp);
 	int  (*on_tcpconn_init)(struct tcp_connection *c, int sock);
 	int  (*on_tcpconn_init)(struct tcp_connection *c, int sock);
 	void (*tcpconn_clean)(struct tcp_connection* c);
 	void (*tcpconn_clean)(struct tcp_connection* c);
 	void (*tcpconn_close)(struct tcp_connection*c , int fd);
 	void (*tcpconn_close)(struct tcp_connection*c , int fd);
-	/* checks if a tls connection is fully established before a read, and if 
-	 * not it runs tls_accept() or tls_connect() as needed
-	 * (tls_accept and tls_connect are deferred to the "reader" process for
-	 *  performance reasons)
-	 * returns 1 if the read can continue, 0 if the connection is not yet
-	 * ready for the read and fix_read_con() should be attempted at a latter
-	 * time and <0 on error.
-	 */
-	int (*fix_read_con)(struct tcp_connection* c);
 	
 	
 	/* per listening socket init, called on ser startup (after modules,
 	/* per listening socket init, called on ser startup (after modules,
 	 *  process table, init() and udp socket initialization)*/
 	 *  process table, init() and udp socket initialization)*/
@@ -100,11 +98,12 @@ extern struct tls_hooks tls_hook;
 
 
 #define tls_tcpconn_init(c, s)	tls_hook_call(on_tcpconn_init, 0, (c), (s))
 #define tls_tcpconn_init(c, s)	tls_hook_call(on_tcpconn_init, 0, (c), (s))
 #define tls_tcpconn_clean(c)	tls_hook_call_v(tcpconn_clean, (c))
 #define tls_tcpconn_clean(c)	tls_hook_call_v(tcpconn_clean, (c))
-#define tls_blocking_write(c, fd, buf, len) \
-	tls_hook_call(blocking_write, -1, (c), (fd), (buf), (len))
+#define tls_do_send(fd, c, buf, len, send_flags, resp) \
+	tls_hook_call(do_send, -1, (fd), (c), (buf), (len), (send_flags), (resp))
+#define tls_1st_send(fd, c, buf, len, send_flags, resp) \
+	tls_hook_call(fst_send, -1, (fd), (c), (buf), (len), (send_flags), (resp))
 #define tls_close(conn, fd)		tls_hook_call_v(tcpconn_close, (conn), (fd))
 #define tls_close(conn, fd)		tls_hook_call_v(tcpconn_close, (conn), (fd))
-#define tls_read(c)				tls_hook_call(read, -1, (c))
-#define tls_fix_read_conn(c)	tls_hook_call(fix_read_con, -1, (c))
+#define tls_read(c, flags)				tls_hook_call(read, -1, (c), (flags))
 
 
 int register_tls_hooks(struct tls_hooks* h);
 int register_tls_hooks(struct tls_hooks* h);