Ver Fonte

- increased ROUTE_MAX_REC_LEV to 100
- added new route commad: send_tcp(address, port)
- tcp global vars (added new members to pt a.s.o)
- tcp_send(buf, len, server, 0) will try to find an existing open tcp
connection to "server"; if none found a new one will be opened (ser will also
start listening on it for sip messages)
- first udp to tcp & tcp to tcp send (not forward, I still have to solve some
tricky via stuff)

Andrei Pelinescu-Onciul há 23 anos atrás
pai
commit
0c5da34bd1
16 ficheiros alterados com 555 adições e 101 exclusões
  1. 18 5
      action.c
  2. 4 0
      cfg.lex
  3. 41 0
      cfg.y
  4. 1 1
      config.h
  5. 19 0
      fifo_server.c
  6. 4 0
      globals.h
  7. 45 0
      ip_addr.h
  8. 69 19
      main.c
  9. 4 0
      pt.h
  10. 2 0
      route.c
  11. 6 0
      route_struct.c
  12. 3 1
      route_struct.h
  13. 11 1
      tcp_conn.h
  14. 260 55
      tcp_main.c
  15. 22 19
      tcp_read.c
  16. 46 0
      tcp_server.h

+ 18 - 5
action.c

@@ -43,6 +43,9 @@
 #include "mem/mem.h"
 #include "globals.h"
 #include "dset.h"
+#ifdef USE_TCP
+#include "tcp_server.h"
+#endif
 
 #include <sys/types.h>
 #include <sys/socket.h>
@@ -133,6 +136,7 @@ int do_action(struct action* a, struct sip_msg* msg)
 			}
 			break;
 		case SEND_T:
+		case SEND_TCP_T:
 			if ((a->p1_type!= PROXY_ST)|(a->p2_type!=NUMBER_ST)){
 				LOG(L_CRIT, "BUG: do_action: bad send() types %d, %d\n",
 						a->p1_type, a->p2_type);
@@ -161,12 +165,21 @@ int do_action(struct action* a, struct sip_msg* msg)
 			if (ret==0){
 				p->tx++;
 				p->tx_bytes+=msg->len;
-				send_sock=get_send_socket(to);
-				if (send_sock!=0){
-					ret=udp_send(send_sock, msg->orig, msg->len, to);
-				}else{
-					ret=-1;
+				if (a->type==SEND_T){
+					/*udp*/
+					send_sock=get_send_socket(to);
+					if (send_sock!=0){
+						ret=udp_send(send_sock, msg->orig, msg->len, to);
+					}else{
+						ret=-1;
+					}
 				}
+#ifdef USE_TCP
+					else{
+					/*tcp*/
+					ret=tcp_send(msg->orig, msg->len, to, 0);
+				}
+#endif
 			}
 			free(to);
 			if (ret<0){

+ 4 - 0
cfg.lex

@@ -41,8 +41,10 @@
 
 /* action keywords */
 FORWARD	forward
+FORWARD_TCP	forward_tcp
 DROP	"drop"|"break"
 SEND	send
+SEND_TCP	send_tcp
 LOG		log
 ERROR	error
 ROUTE	route
@@ -152,8 +154,10 @@ EAT_ABLE	[\ \t\b\r]
 <INITIAL>{EAT_ABLE}	{ count(); }
 
 <INITIAL>{FORWARD}	{count(); yylval.strval=yytext; return FORWARD; }
+<INITIAL>{FORWARD_TCP}	{count(); yylval.strval=yytext; return FORWARD_TCP; }
 <INITIAL>{DROP}	{ count(); yylval.strval=yytext; return DROP; }
 <INITIAL>{SEND}	{ count(); yylval.strval=yytext; return SEND; }
+<INITIAL>{SEND_TCP}	{ count(); yylval.strval=yytext; return SEND_TCP; }
 <INITIAL>{LOG}	{ count(); yylval.strval=yytext; return LOG_TOK; }
 <INITIAL>{ERROR}	{ count(); yylval.strval=yytext; return ERROR; }
 <INITIAL>{SETFLAG}	{ count(); yylval.strval=yytext; return SETFLAG; }

+ 41 - 0
cfg.y

@@ -63,7 +63,9 @@ struct id_list* lst_tmp;
 
 /* keywords */
 %token FORWARD
+%token FORWARD_TCP
 %token SEND
+%token SEND_TCP
 %token DROP
 %token LOG_TOK
 %token ERROR
@@ -720,6 +722,45 @@ cmd:		FORWARD LPAREN host RPAREN	{ $$=mk_action(	FORWARD_T,
 		| SEND error { $$=0; yyerror("missing '(' or ')' ?"); }
 		| SEND LPAREN error RPAREN { $$=0; yyerror("bad send"
 													"argument"); }
+		| SEND_TCP LPAREN host RPAREN	{ $$=mk_action(	SEND_TCP_T,
+													STRING_ST,
+													NUMBER_ST,
+													$3,
+													0);
+									}
+		| SEND_TCP LPAREN STRING RPAREN { $$=mk_action(	SEND_TCP_T,
+													STRING_ST,
+													NUMBER_ST,
+													$3,
+													0);
+									}
+		| SEND_TCP LPAREN ip RPAREN		{ $$=mk_action(	SEND_TCP_T,
+													IP_ST,
+													NUMBER_ST,
+													(void*)$3,
+													0);
+									}
+		| SEND_TCP LPAREN host COMMA NUMBER RPAREN	{ $$=mk_action(	SEND_TCP_T,
+																STRING_ST,
+																NUMBER_ST,
+																$3,
+																(void*)$5);
+												}
+		| SEND_TCP LPAREN STRING COMMA NUMBER RPAREN {$$=mk_action(	SEND_TCP_T,
+																STRING_ST,
+																NUMBER_ST,
+																$3,
+																(void*)$5);
+												}
+		| SEND_TCP LPAREN ip COMMA NUMBER RPAREN { $$=mk_action(	SEND_TCP_T,
+																IP_ST,
+																NUMBER_ST,
+																(void*)$3,
+																(void*)$5);
+											   }
+		| SEND_TCP error { $$=0; yyerror("missing '(' or ')' ?"); }
+		| SEND_TCP LPAREN error RPAREN { $$=0; yyerror("bad send_tcp"
+													"argument"); }
 		| DROP LPAREN RPAREN	{$$=mk_action(DROP_T,0, 0, 0, 0); }
 		| DROP					{$$=mk_action(DROP_T,0, 0, 0, 0); }
 		| LOG_TOK LPAREN STRING RPAREN	{$$=mk_action(	LOG_T, NUMBER_ST, 

+ 1 - 1
config.h

@@ -50,7 +50,7 @@
 #define DEFAULT_RT 0 /* default routing table */
 
 #define MAX_REC_LEV 100 /* maximum number of recursive calls */
-#define ROUTE_MAX_REC_LEV 10 /* maximum number of recursive calls
+#define ROUTE_MAX_REC_LEV 100 /* maximum number of recursive calls
 							   for route()*/
 
 #define MAX_URI_SIZE 1024	/* used when rewriting URIs */

+ 19 - 0
fifo_server.c

@@ -458,6 +458,9 @@ int open_fifo_server()
 {
 	char *t;
 	struct stat filestat;
+#ifdef USE_TCP
+	int sockfd[2];
+#endif
 
 	if (fifo==NULL) {
 		DBG("TM: open_uac_fifo: no fifo will be opened\n");
@@ -502,6 +505,13 @@ int open_fifo_server()
 		return -1;
 	}
 	memcpy(up_since_ctime,t,strlen(t)+1);
+#ifdef USE_TCP
+	if (socketpair(AF_LOCAL, SOCK_STREAM, 0, sockfd)<0){
+			LOG(L_ERR, "ERROR: open_fifo_server: socketpair failed: %s\n",
+				strerror(errno));
+			return -1;
+	}
+#endif
 	process_no++;
 	fifo_pid=fork();
 	if (fifo_pid<0) {
@@ -514,6 +524,10 @@ int open_fifo_server()
 		/* call per-child module initialization too -- some
 		   FIFO commands may need it
 		*/
+#ifdef USE_TCP
+		close(sockfd[0]);
+		unix_tcp_sock=sockfd[1];
+#endif
 		if (init_child(process_no) < 0 ) {
 			LOG(L_ERR, "ERROR: open_uac_fifo: init_child failed\n");
 			return -1;
@@ -539,6 +553,11 @@ int open_fifo_server()
 	/* dad process */
 	pt[process_no].pid=fifo_pid;
 	strncpy(pt[process_no].desc, "fifo server", MAX_PT_DESC );
+#ifdef USE_TCP
+	close(sockfd[1]);
+	pt[process_no].unix_sock=sockfd[0];
+	pt[process_no].idx=-1; /* this is not "tcp" process*/
+#endif
 	/* make sure the read fifo will not close */
 	fifo_write=open(fifo, O_WRONLY, 0);
 	if (fifo_write<0) {

+ 4 - 0
globals.h

@@ -66,6 +66,10 @@ extern struct socket_info* sendipv4; /* ipv4 socket to use when msg.
 										comes from ipv6*/
 extern struct socket_info* sendipv6; /* same as above for ipv6 */
 
+#ifdef USE_TCP
+extern int unix_tcp_sock; /* socket used for communication with tcp main*/
+#endif
+
 extern unsigned int maxbuffer;
 extern int children_no;
 #ifdef USE_TCP

+ 45 - 0
ip_addr.h

@@ -157,6 +157,7 @@ inline static int matchnet(struct ip_addr* ip, struct net* net)
 
 
 
+
 /* inits an ip_addr pointer from a sockaddr structure*/
 static inline void sockaddr2ip_addr(struct ip_addr* ip, struct sockaddr* sa)
 {
@@ -181,6 +182,50 @@ static inline void sockaddr2ip_addr(struct ip_addr* ip, struct sockaddr* sa)
 
 
 
+/* compare 2 ip_addrs (both args are pointers)*/
+#define ip_addr_cmp(ip1, ip2) \
+	(((ip1)->af==(ip2)->af)&& \
+	 	(memcmp((ip1)->u.addr, (ip2)->u.addr, (ip1)->len)==0))
+
+
+
+/* compare 2 sockaddr_unions */
+static inline int su_cmp(union sockaddr_union* s1, union sockaddr_union* s2)
+{
+	if (s1->s.sa_family!=s2->s.sa_family) return 0;
+	switch(s1->s.sa_family){
+		case AF_INET:
+			return (s1->sin.sin_port==s2->sin.sin_port)&&
+					(memcmp(&s1->sin.sin_addr, &s2->sin.sin_addr, 4)==0);
+		case AF_INET6:
+			return (s1->sin6.sin6_port==s2->sin6.sin6_port)&&
+					(memcmp(&s1->sin6.sin6_addr, &s2->sin6.sin6_addr, 16)==0);
+		default:
+			LOG(L_CRIT,"su_cmp: BUG: unknown address family %d\n",
+						s1->s.sa_family);
+			return 0;
+	}
+}
+
+
+
+/* gets the port number */
+static inline short su_getport(union sockaddr_union* su)
+{
+	switch(su->s.sa_family){
+		case AF_INET:
+			return su->sin.sin_port;
+		case AF_INET6:
+			return su->sin6.sin6_port;
+		default:
+			LOG(L_CRIT,"su_get_port: BUG: unknown address family %d\n",
+						su->s.sa_family);
+			return 0;
+	}
+}
+
+
+
 /* inits an ip_addr pointer from a sockaddr_union ip address */
 static inline void su2ip_addr(struct ip_addr* ip, union sockaddr_union* su)
 {

+ 69 - 19
main.c

@@ -528,6 +528,9 @@ int main_loop()
 {
 	int r, i;
 	pid_t pid;
+#ifdef USE_TCP
+	int sockfd[2];
+#endif
 #ifdef WITH_SNMP_MOD
 	int (*snmp_start)();
 
@@ -632,31 +635,25 @@ int main_loop()
 			/* all procs should have access to all the sockets (for sending)
 			 * so we open all first*/
 		}
-#ifdef USE_TCP
-			/* start tcp receivers */
-		if (tcp_init_children()<0) goto error;
-			/* start tcp master proc */
-		process_no++;
-		if ((pid=fork())<0){
-			LOG(L_CRIT, "main_loop: cannot fork tcp main process\n");
-			goto error;
-		}else if (pid==0){
-			/* child */
-			/* is_main=0; */
-			tcp_main_loop();
-		}else{
-			pt[process_no].pid=pid;
-			strncpy(pt[process_no].desc, "tcp main process", MAX_PT_DESC );
-		}
-#endif
 		for(r=0; r<sock_no;r++){
 			for(i=0;i<children_no;i++){
 				process_no++;
+#ifdef USE_TCP
+		 		if (socketpair(AF_LOCAL, SOCK_STREAM, 0, sockfd)<0){
+					LOG(L_ERR, "ERROR: main_loop: socketpair failed: %s\n",
+						strerror(errno));
+					goto error;
+				}
+#endif
 				if ((pid=fork())<0){
 					LOG(L_CRIT,  "main_loop: Cannot fork\n");
 					goto error;
 				}else if (pid==0){
 					     /* child */
+#ifdef USE_TCP
+					close(sockfd[0]);
+					unix_tcp_sock=sockfd[1];
+#endif
 					bind_address=&sock_info[r]; /* shortcut */
 					bind_idx=r;
 					if (init_child(i) < 0) {
@@ -672,6 +669,11 @@ int main_loop()
 						snprintf(pt[process_no].desc, MAX_PT_DESC,
 							"receiver child=%d sock=%d @ %s:%s", i, r, 	
 							sock_info[r].name.s, sock_info[r].port_no_str.s );
+#ifdef USE_TCP
+						close(sockfd[1]);
+						pt[process_no].unix_sock=sockfd[0];
+						pt[process_no].idx=-1; /* this is not "tcp" process*/
+#endif
 				}
 			}
 			/*parent*/
@@ -682,6 +684,7 @@ int main_loop()
 	/*this is the main process*/
 	bind_address=&sock_info[0]; /* main proc -> it shoudln't send anything, */
 	bind_idx=0;					/* if it does it will use the first address */
+	
 	/* if configured to do so, start a server for accepting FIFO commands */
 	if (open_fifo_server()<0) {
 		LOG(L_ERR, "opening fifo server failed\n");
@@ -693,6 +696,13 @@ int main_loop()
 	if (timer_list)
 #endif
 	{
+#ifdef USE_TCP
+ 		if (socketpair(AF_LOCAL, SOCK_STREAM, 0, sockfd)<0){
+			LOG(L_ERR, "ERROR: main_loop: socketpair failed: %s\n",
+				strerror(errno));
+			goto error;
+		}
+#endif
 		/* fork again for the attendant process*/
 		process_no++;
 		if ((pid=fork())<0){
@@ -701,6 +711,10 @@ int main_loop()
 		}else if (pid==0){
 			/* child */
 			/* is_main=0; */
+#ifdef USE_TCP
+			close(sockfd[0]);
+			unix_tcp_sock=sockfd[1];
+#endif
 			for(;;){
 				/* debug:  instead of doing something usefull */
 				/* (placeholder for timers, etc.) */
@@ -711,12 +725,41 @@ int main_loop()
 		}else{
 			pt[process_no].pid=pid;
 			strncpy(pt[process_no].desc, "timer", MAX_PT_DESC );
+#ifdef USE_TCP
+						close(sockfd[1]);
+						pt[process_no].unix_sock=sockfd[0];
+						pt[process_no].idx=-1; /* this is not a "tcp" process*/
+#endif
 		}
 	}
-
+#ifdef USE_TCP
+			/* start tcp receivers */
+		if (tcp_init_children()<0) goto error;
+			/* start tcp master proc */
+		process_no++;
+		if ((pid=fork())<0){
+			LOG(L_CRIT, "main_loop: cannot fork tcp main process\n");
+			goto error;
+		}else if (pid==0){
+			/* child */
+			/* is_main=0; */
+			tcp_main_loop();
+		}else{
+			pt[process_no].pid=pid;
+			strncpy(pt[process_no].desc, "tcp main process", MAX_PT_DESC );
+			pt[process_no].unix_sock=-1;
+			pt[process_no].idx=-1; /* this is not a "tcp" process*/
+			unix_tcp_sock=-1;
+		}
+#endif
 	/* main */
 	pt[0].pid=getpid();
 	strncpy(pt[0].desc, "attendant", MAX_PT_DESC );
+#ifdef USE_TCP
+	pt[process_no].unix_sock=-1;
+	pt[process_no].idx=-1; /* this is not a "tcp" process*/
+	unix_tcp_sock=-1;
+#endif
 	/*DEBUG- remove it*/
 #ifdef DEBUG
 	printf("\n% 3d processes, % 3d children * % 3d listening addresses + main"
@@ -1177,7 +1220,14 @@ try_again:
 		LOG(L_CRIT, "could not initialize timer, exiting...\n");
 		goto error;
 	}
-
+#ifdef USE_TCP
+	/*init tcp*/
+	if (init_tcp()<0){
+		LOG(L_CRIT, "could not initialize tcp, exiting...\n");
+		goto error;
+	}
+#endif
+	
 	/* register a diagnostic FIFO command */
 	if (register_core_fifo()<0) {
 		LOG(L_CRIT, "unable to register core FIFO commands\n");

+ 4 - 0
pt.h

@@ -43,6 +43,10 @@
 
 struct process_table {
 	int pid;
+#ifdef USE_TCP
+	int unix_sock; /* unix socket on which tcp main listens */
+	int idx; /* tcp child index, -1 for other processes */
+#endif
 	char desc[MAX_PT_DESC];
 };
 

+ 2 - 0
route.c

@@ -145,7 +145,9 @@ static int fix_actions(struct action* a)
 	for(t=a; t!=0; t=t->next){
 		switch(t->type){
 			case FORWARD_T:
+			case FORWARD_TCP_T:
 			case SEND_T:
+			case SEND_TCP_T:
 					switch(t->p1_type){
 						case IP_ST: 
 							tmp=strdup(ip_addr2a(

+ 6 - 0
route_struct.c

@@ -232,9 +232,15 @@ void print_action(struct action* a)
 			case FORWARD_T:
 					DBG("forward(");
 					break;
+			case FORWARD_TCP_T:
+					DBG("forward_tcp(");
+					break;
 			case SEND_T:
 					DBG("send(");
 					break;
+			case SEND_TCP_T:
+					DBG("send_tcp(");
+					break;
 			case DROP_T:
 					DBG("drop(");
 					break;

+ 3 - 1
route_struct.h

@@ -54,7 +54,9 @@ enum { FORWARD_T=1, SEND_T, DROP_T, LOG_T, ERROR_T, ROUTE_T, EXEC_T,
 		SETFLAG_T, RESETFLAG_T, ISFLAGSET_T ,
 		LEN_GT_T, PREFIX_T, STRIP_T,
 		APPEND_BRANCH_T,
-		REVERT_URI_T };
+		REVERT_URI_T,
+		FORWARD_TCP_T,
+		SEND_TCP_T};
 enum { NOSUBTYPE=0, STRING_ST, NET_ST, NUMBER_ST, IP_ST, RE_ST, PROXY_ST,
 		EXPR_ST, ACTIONS_ST, CMDF_ST, MODFIXUP_ST, URIHOST_ST, URIPORT_ST,
 		MYSELF_ST };

+ 11 - 1
tcp_conn.h

@@ -50,6 +50,10 @@ enum {	H_SKIP, H_LF, H_LFCR,  H_BODY, H_STARTWS,
 		H_CONT_LEN_BODY, H_CONT_LEN_BODY_PARSE 
 	};
 
+/* fd communication commands */
+enum { CONN_DESTROY=-3, CONN_ERROR=-2, CONN_EOF=-1, CONN_RELEASE, CONN_GET_FD,
+	   CONN_NEW };
+
 struct tcp_req{
 	struct tcp_req* next;
 	/* sockaddr ? */
@@ -71,7 +75,10 @@ struct tcp_req{
 struct tcp_connection{
 	int s; /*socket, used by "tcp main" */
 	int fd; /* used only by "children" */
+	int id; /* id (unique!) used to retrieve a specific connection when
+	           reply-ing*/
 	struct ip_addr ip; /* peer ip */
+	int port; /* peer port */
 	int sock_idx; /* receiving socket index in the tcp_info array */
 	union sockaddr_union su;
 	struct tcp_req req; /* request data */
@@ -116,7 +123,10 @@ struct tcp_connection{
 	}while(0)
 
 
-
+#define TCPCONN_LOCK LOG(L_CRIT, "LOCK not implemented yet: %s : %d: %s\n", \
+							__FILE__, __LINE__, __FUNCTION__);
+#define TCPCONN_UNLOCK LOG(L_CRIT, "UNLOCK not implemented yet: %s: %d: %s\n",\
+							__FILE__, __LINE__, __FUNCTION__);
 
 
 

+ 260 - 55
tcp_main.c

@@ -54,6 +54,7 @@
 #include "mem/mem.h"
 #include "mem/shm_mem.h"
 #include "timer.h"
+#include "tcp_server.h"
 
 
 
@@ -64,23 +65,23 @@
 
 struct tcp_child{
 	pid_t pid;
-	int s; /* unix socket for comm*/
+	int unix_sock; /* unix sock fd, copied from pt*/
 	int busy;
 	int n_reqs; /* number of requests serviced so far */
 };
 
 
-enum { CONN_OK, CONN_ERROR };
 
-
-
-
-struct tcp_connection* conn_list=0;
+struct tcp_connection** conn_list=0;
 struct tcp_child tcp_children[MAX_TCP_CHILDREN];
+static int connection_id=1; /*  unique for each connection, used for 
+								quickly finding the corresponding connection
+								for a reply */
+int unix_tcp_sock;
 
 
 
-struct tcp_connection*  tcpconn_add(int sock, union sockaddr_union* su, int i)
+struct tcp_connection* tcpconn_new(int sock, union sockaddr_union* su, int i)
 {
 	struct tcp_connection *c;
 	
@@ -91,15 +92,15 @@ struct tcp_connection*  tcpconn_add(int sock, union sockaddr_union* su, int i)
 		goto error;
 	}
 	c->s=sock;
+	c->fd=sock;
 	c->su=*su;
 	c->sock_idx=i;
 	c->refcnt=0;
 	su2ip_addr(&c->ip, su);
+	c->port=su_getport(su);
 	init_tcp_req(&c->req);
 	c->timeout=get_ticks()+TCP_CON_TIMEOUT;
-
-	/* add it at the begining of the list*/
-	tcpconn_listadd(conn_list, c, next, prev);
+	c->id=connection_id++;
 	return c;
 	
 error:
@@ -108,27 +109,158 @@ error:
 
 
 
+struct tcp_connection* tcpconn_connect(union sockaddr_union* server)
+{
+	int s;
+
+	s=socket(AF2PF(server->s.sa_family), SOCK_STREAM, 0);
+	if (s<0){
+		LOG(L_ERR, "ERROR: tcpconn_connect: socket: (%d) %s\n",
+				errno, strerror(errno));
+		goto error;
+	}
+	if (connect(s, &server->s, sockaddru_len(*server))<0){
+		LOG(L_ERR, "ERROR: tcpconn_connect: connect: (%d) %s\n",
+				errno, strerror(errno));
+		goto error;
+	}
+	return tcpconn_new(s, server, 0); /*FIXME: set sock idx! */
+error:
+	return 0;
+}
+
+
+
+struct tcp_connection*  tcpconn_add(struct tcp_connection *c)
+{
+	TCPCONN_LOCK;
+	/* add it at the begining of the list*/
+	if (c) tcpconn_listadd(*conn_list, c, next, prev);
+	TCPCONN_UNLOCK;
+	return c;
+}
+
+
+
 void tcpconn_rm(struct tcp_connection* c)
 {
-	tcpconn_listrm(conn_list, c, next, prev);
+	TCPCONN_LOCK;
+	tcpconn_listrm(*conn_list, c, next, prev);
+	TCPCONN_UNLOCK;
 	shm_free(c);
 }
 
 
+/* finds a connection, if id=0 uses the ip addr & port */
+struct tcp_connection* tcpconn_find(int id, struct ip_addr* ip, int port)
+{
+
+	struct tcp_connection *c;
+	
+	DBG("tcpconn_find: %d ",id ); print_ip(ip); DBG(" %d\n", port);
+	for (c=*conn_list; c; c=c->next){
+		DBG("c=%p, c->id=%d, ip=",c, c->id);
+		print_ip(&c->ip);
+		DBG(" port=%d\n", c->port);
+		if (id){
+			if (id==c->id) return c;
+		}else if ((port==c->port)&&(ip_addr_cmp(ip, &c->ip))) return c;
+	}
+	return 0;
+}
+
+
+
+struct tcp_connection* tcpconn_get(int id, struct ip_addr* ip, int port)
+{
+	struct tcp_connection* c;
+	TCPCONN_LOCK;
+	c=tcpconn_find(id, ip, port);
+	if (c) c->refcnt++;
+	TCPCONN_UNLOCK;
+	return c;
+}
+
+
+
+void tcpconn_put(struct tcp_connection* c)
+{
+	c->refcnt--; /* FIXME: atomic_dec */
+}
+
+
+
+/* finds a tcpconn & sends on it */
+int tcp_send(char* buf, unsigned len, union sockaddr_union* to, int id)
+{
+	struct tcp_connection *c;
+	struct ip_addr ip;
+	int port;
+	long response[2];
+	int n;
+	
+	su2ip_addr(&ip, to);
+	port=su_getport(to);
+	
+	c=tcpconn_get(id, &ip, port); /* lock ;inc refcnt; unlock */
+	if (id){
+		if (c==0) {
+		LOG(L_ERR, "ERROR: tcp_send: id %d not found, dropping\n",
+					id);
+			return -1;
+		}
+	}else{
+		if (c==0){
+			DBG("tcp_send: no open tcp connection found, opening new one\n");
+			/* create tcp connection */
+			if ((c=tcpconn_connect(to))==0){
+				LOG(L_ERR, "ERROR: tcp_send: connect failed\n");
+				return 0;
+			}
+			c->refcnt++;
+			
+			/* send the new tcpconn to "tcp main" */
+			response[0]=(long)c;
+			response[1]=CONN_NEW;
+			n=write(unix_tcp_sock, response, sizeof(response));
+			n=send_fd(unix_tcp_sock, &c, sizeof(c), c->s);
+		}else{
+			DBG("tcp_send: tcp connection found, acquiring fd\n");
+			/* get the fd */
+			response[0]=(long)c;
+			response[1]=CONN_GET_FD;
+			n=write(unix_tcp_sock, response, sizeof(response));
+			n=receive_fd(unix_tcp_sock, &c, sizeof(c), &c->fd);
+		}
+	
+	}
+	DBG("tcp_send: sending...\n");
+	n=write(c->fd, buf, len);
+	close(c->fd);
+	tcpconn_put(c); /* release c (lock; dec refcnt; unlock) */
+	return n;
+}
+
+
+
 /* very ineficient for now, use hashtable some day - FIXME*/
-void tcpconn_timeout()
+void tcpconn_timeout(fd_set* set)
 {
 	struct tcp_connection *c, *next;
 	int ticks;;
 	
 	
 	ticks=get_ticks();
-	c=conn_list;
+	c=*conn_list;
 	while(c){
 		next=c->next;
 		if ((c->refcnt==0) && (ticks>c->timeout)) {
 			DBG("tcpconn_timeout: timeout for %p (%d > %d)\n",
 					c, ticks, c->timeout);
+			if (c->s>0) {
+				FD_CLR(c->s, set);
+				close(c->s);
+			}
 			tcpconn_rm(c);
 		}
 		c=next;
@@ -209,7 +341,8 @@ static int send2child(struct tcp_connection* tcpconn)
 				min_busy);
 	}
 	DBG("send2child: to child %d, %ld\n", idx, (long)tcpconn);
-	send_fd(tcp_children[idx].s, &tcpconn, sizeof(tcpconn), tcpconn->s);
+	send_fd(tcp_children[idx].unix_sock, &tcpconn, sizeof(tcpconn),
+			tcpconn->s);
 	
 	return 0; /* just to fix a warning*/
 }
@@ -226,7 +359,7 @@ void tcp_main_loop()
 	union sockaddr_union su;
 	struct tcp_connection* tcpconn;
 	long response[2];
-	int state;
+	int cmd;
 	int bytes;
 	socklen_t su_len;
 	struct timeval timeout;
@@ -242,10 +375,10 @@ void tcp_main_loop()
 		}
 	}
 	/* set all the unix sockets used for child comm */
-	for (r=0; r<tcp_children_no; r++){
-		if (tcp_children[r].s>=0){
-			FD_SET(tcp_children[r].s, &master_set);
-			if (tcp_children[r].s>maxfd) maxfd=tcp_children[r].s;
+	for (r=0; r<process_no; r++){
+		if (pt[r].unix_sock>=0){
+			FD_SET(pt[r].unix_sock, &master_set);
+			if (pt[r].unix_sock>maxfd) maxfd=pt[r].unix_sock;
 		}
 	}
 	
@@ -262,6 +395,7 @@ void tcp_main_loop()
 			/* errors */
 			LOG(L_ERR, "ERROR: tcp_main_loop: select:(%d) %s\n", errno,
 					strerror(errno));
+			n=0;
 		}
 		
 		for (r=0; r<sock_no && n; r++){
@@ -278,21 +412,25 @@ void tcp_main_loop()
 				}
 				
 				/* add socket to list */
-				tcpconn=tcpconn_add(new_sock, &su, r);
-				DBG("tcp_main_loop: new connection: %p %d\n",
+				tcpconn=tcpconn_new(new_sock, &su, r);
+				if (tcpconn){
+					tcpconn_add(tcpconn);
+					DBG("tcp_main_loop: new connection: %p %d\n",
 						tcpconn, tcpconn->s);
-				/* pass it to a child */
-				if(send2child(tcpconn)<0){
-					LOG(L_ERR,"ERROR: tcp_main_loop: no children available\n");
-					close(tcpconn->s);
-					tcpconn_rm(tcpconn);
+					/* pass it to a child */
+					if(send2child(tcpconn)<0){
+						LOG(L_ERR,"ERROR: tcp_main_loop: no children "
+								"available\n");
+						close(tcpconn->s);
+						tcpconn_rm(tcpconn);
+					}
 				}
 			}
 		}
 		
 		/* check all the read fds (from the tcpconn list) */
 		
-		for(tcpconn=conn_list; tcpconn && n; tcpconn=tcpconn->next){
+		for(tcpconn=*conn_list; tcpconn && n; tcpconn=tcpconn->next){
 			if ((tcpconn->refcnt==0)&&(FD_ISSET(tcpconn->s, &sel_set))){
 				/* new data available */
 				n--;
@@ -309,17 +447,18 @@ void tcp_main_loop()
 		}
 		
 		/* check unix sockets & listen | destroy connections */
-		for (r=0; r<tcp_children_no && n; r++){
-			if (FD_ISSET(tcp_children[r].s, &sel_set)){
+		/* start from 1, the "main" process does not transmit anything*/
+		for (r=1; r<process_no && n; r++){
+			if ( (pt[r].unix_sock>=0) && FD_ISSET(pt[r].unix_sock, &sel_set)){
 				n--;
 				/* errno==EINTR !!! TODO*/
 read_again:
-				bytes=read(tcp_children[r].s, response, sizeof(response));
+				bytes=read(pt[r].unix_sock, response, sizeof(response));
 				if (bytes==0){
 					/* EOF -> bad, child has died */
 					LOG(L_CRIT, "BUG: tcp_main_loop: dead child %d\n", r);
-					/* terminating everybody */
-					FD_CLR(tcp_children[r].s, &master_set);
+					/* don't listen on it any more */
+					FD_CLR(pt[r].unix_sock, &master_set);
 					/*exit(-1)*/;
 				}else if (bytes<0){
 					if (errno==EINTR) goto read_again;
@@ -330,45 +469,108 @@ read_again:
 					}
 				}
 					
-				DBG("tcp__main_loop: read response= %lx, %ld\n",
-						response[0], response[1]);
-				tcp_children[r].busy=0;
-				tcpconn=(struct tcp_connection*)response[0];
-				state=response[1];
-				if (tcpconn){
-					tcpconn->refcnt--;
-					if (state>=0){
-						/* listen on this too */
-						if (tcpconn->refcnt==0){
+				DBG("tcp_main_loop: read response= %lx, %ld from %d (%d)\n",
+						response[0], response[1], r, pt[r].pid);
+				cmd=response[1];
+				switch(cmd){
+					case CONN_RELEASE:
+						if (pt[r].idx>=0){
+							tcp_children[pt[r].idx].busy--;
+						}else{
+							LOG(L_CRIT, "BUG: tcp_main_loop: CONN_RELEASE\n");
+						}
+						tcpconn=(struct tcp_connection*)response[0];
+						if (tcpconn){
+							tcpconn->refcnt--;
+							DBG("tcp_main_loop: %p refcnt= %d\n", 
+									tcpconn, tcpconn->refcnt);
+								FD_SET(tcpconn->s, &master_set);
+								if (maxfd<tcpconn->s) maxfd=tcpconn->s;
+								/* update the timeout*/
+								tcpconn->timeout=get_ticks()+TCP_CON_TIMEOUT;
+						}
+						break;
+					case CONN_ERROR:
+					case CONN_DESTROY:
+					case CONN_EOF:
+						if (pt[r].idx>=0){
+							tcp_children[pt[r].idx].busy--;
+						}else{
+							LOG(L_CRIT, "BUG: tcp_main_loop: CONN_RELEASE\n");
+						}
+						tcpconn=(struct tcp_connection*)response[0];
+						if (tcpconn){
+							tcpconn->refcnt--;
+							if (tcpconn->refcnt==0){
+								DBG("tcp_main_loop: destroying connection\n");
+								close(tcpconn->s);
+								tcpconn_rm(tcpconn);
+							}else{
+								DBG("tcp_main_loop: delaying ...\n");
+							}
+						}
+						break;
+					case CONN_GET_FD:
+						/* send the requested FD  */
+						tcpconn=(struct tcp_connection*)response[0];
+						/* WARNING: take care of setting refcnt properly to
+						 * avoid race condition */
+						if (tcpconn){
+							send_fd(pt[r].unix_sock, &tcpconn,
+									sizeof(tcpconn), tcpconn->s);
+						}else{
+							LOG(L_CRIT, "BUG: tcp_main_loop: null pointer\n");
+						}
+						break;
+					case CONN_NEW:
+						/* update the fd in the requested tcpconn*/
+						tcpconn=(struct tcp_connection*)response[0];
+						/* WARNING: take care of setting refcnt properly to
+						 * avoid race condition */
+						if (tcpconn){
+							receive_fd(pt[r].unix_sock, &tcpconn,
+										sizeof(tcpconn), &tcpconn->s);
+							/* add tcpconn to the list*/
+							tcpconn_add(tcpconn);
 							FD_SET(tcpconn->s, &master_set);
 							if (maxfd<tcpconn->s) maxfd=tcpconn->s;
 							/* update the timeout*/
 							tcpconn->timeout=get_ticks()+TCP_CON_TIMEOUT;
-						}
-					}else{
-						/*error, we should destroy it */
-						if (tcpconn->refcnt==0){
-							DBG("tcp_main_loop: destroying connection\n");
-							close(tcpconn->s);
-							tcpconn_rm(tcpconn);
 						}else{
-							DBG("tcp_main_loop: delaying ...\n");
+							LOG(L_CRIT, "BUG: tcp_main_loop: null pointer\n");
 						}
-					}
-				}else{
-					LOG(L_CRIT, "BUG: tcp_main_loop: null tcp conn pointer\n");
+						break;
+					default:
+							LOG(L_CRIT, "BUG: tcp_main_loop: unknown cmd %d\n",
+									cmd);
 				}
 			}
 		}
 		
 		/* remove old connections */
-		tcpconn_timeout();
+		tcpconn_timeout(&master_set);
 	
 	}
 }
 
 
 
+int init_tcp()
+{
+	/* allocate list head*/
+	conn_list=shm_malloc(sizeof(struct tcp_connection*));
+	if (conn_list==0){
+		LOG(L_CRIT, "ERROR: tcp_init: memory allocation failure\n");
+		goto error;
+	}
+	*conn_list=0;
+	return 0;
+error:
+		return -1;
+}
+
+
+
 /* starts the tcp processes */
 int tcp_init_children()
 {
@@ -402,14 +604,17 @@ int tcp_init_children()
 			/* parent */
 			close(sockfd[1]);
 			tcp_children[r].pid=pid;
-			tcp_children[r].s=sockfd[0];
 			tcp_children[r].busy=0;
 			tcp_children[r].n_reqs=0;
+			tcp_children[r].unix_sock=sockfd[0];
 			pt[process_no].pid=pid;
+			pt[process_no].unix_sock=sockfd[0];
+			pt[process_no].idx=r;
 			strncpy(pt[process_no].desc, "tcp receiver", MAX_PT_DESC);
 		}else{
 			/* child */
 			close(sockfd[0]);
+			unix_tcp_sock=sockfd[1];
 			tcp_receive_loop(sockfd[1]);
 		}
 	}

+ 22 - 19
tcp_read.c

@@ -315,12 +315,12 @@ skip:
 int tcp_read_req(struct tcp_connection* con)
 {
 	int bytes;
-	int state;
+	int resp;
 	long size;
 	struct tcp_req* req;
 	int s;
 		
-		state=0;
+		resp=CONN_RELEASE;
 		s=con->fd;
 		req=&con->req;
 		if(req->complete==0 && req->error==TCP_REQ_OK){
@@ -330,12 +330,12 @@ int tcp_read_req(struct tcp_connection* con)
 					bytes, req->parsed-req->buf, req->state, req->error );
 			if (bytes==-1){
 				LOG(L_ERR, "ERROR: tcp_read_req: error reading \n");
-				state=-1;
+				resp=CONN_ERROR;
 				goto end_req;
 			}
 			if (bytes==0){
 				DBG( "tcp_read_req: EOF\n");
-				state=-1;
+				resp=CONN_EOF;
 				goto end_req;
 			}
 		
@@ -343,7 +343,7 @@ int tcp_read_req(struct tcp_connection* con)
 		if (req->error!=TCP_REQ_OK){
 			LOG(L_ERR,"ERROR: tcp_read_req: bad request, state=%d, error=%d\n",
 					req->state, req->error);
-			state=-1;
+			resp=CONN_ERROR;
 			goto end_req;
 		}
 		if (req->complete){
@@ -357,16 +357,19 @@ int tcp_read_req(struct tcp_connection* con)
 				req->error=TCP_REQ_BAD_LEN;
 				LOG(L_ERR, "ERROR: tcp_read_req: content length not present or"
 						" unparsable\n");
-				state=-1;
+				resp=CONN_ERROR;
 				goto end_req;
 			}
 			/* if we are here everything is nice and ok*/
-			state=0;
+			resp=CONN_RELEASE;
 			/* just for debugging use sendipv4 as receiving socket */
 			DBG("calling receive_msg(%p, %d, )\n",
 					req->buf, (int)(req->parsed-req->buf));
 			bind_address=sendipv4; /*&tcp_info[con->sock_idx];*/
-			receive_msg(req->buf, req->parsed-req->buf, &con->su);
+			if (receive_msg(req->buf, req->parsed-req->buf, &con->su)<0){
+				resp=CONN_ERROR;
+				goto end_req;
+			}
 			
 			/* prepare for next request */
 			size=req->pos-req->body;
@@ -385,7 +388,7 @@ int tcp_read_req(struct tcp_connection* con)
 		
 		
 	end_req:
-		return state;
+		return resp;
 }
 
 
@@ -413,7 +416,7 @@ void tcp_receive_loop(int unix_sock)
 	int n;
 	int nfds;
 	int s;
-	long state;
+	long resp;
 	fd_set master_set;
 	fd_set sel_set;
 	int maxfd;
@@ -463,13 +466,13 @@ void tcp_receive_loop(int unix_sock)
 				if (s==-1) {
 					LOG(L_ERR, "ERROR: tcp_receive_loop: read_fd:"
 									"no fd read\n");
-					state=-1;
-					release_tcpconn(con, state, unix_sock);
+					resp=CONN_ERROR;
+					release_tcpconn(con, resp, unix_sock);
 				}
 				if (con==0){
 					LOG(L_ERR, "ERROR: tcp_receive_loop: null pointer\n");
-					state=-1;
-					release_tcpconn(con, state, unix_sock);
+					resp=CONN_ERROR;
+					release_tcpconn(con, resp, unix_sock);
 				}
 				con->timeout=get_ticks()+TCP_CHILD_TIMEOUT;
 				FD_SET(s, &master_set);
@@ -481,11 +484,11 @@ void tcp_receive_loop(int unix_sock)
 				c_next=con->c_next; /* safe for removing*/
 				if (nfds && FD_ISSET(con->fd, &sel_set)){
 					nfds--;
-					state=tcp_read_req(con);
-					if (state==-1){
+					resp=tcp_read_req(con);
+					if (resp<0){
 						FD_CLR(con->fd, &master_set);
 						tcpconn_listrm(list, con, c_next, c_prev);
-						release_tcpconn(con, state, unix_sock);
+						release_tcpconn(con, resp, unix_sock);
 					}else{
 						/* update timeout */
 						con->timeout=ticks+TCP_CHILD_TIMEOUT;
@@ -496,10 +499,10 @@ void tcp_receive_loop(int unix_sock)
 						/* expired, return to "tcp main" */
 						DBG("tcp_receive_loop: %p expired (%d, %d)\n",
 								con, con->timeout, ticks);
-						state=0;
+						resp=CONN_RELEASE;
 						FD_CLR(con->fd, &master_set);
 						tcpconn_listrm(list, con, c_next, c_prev);
-						release_tcpconn(con, state, unix_sock);
+						release_tcpconn(con, resp, unix_sock);
 					}
 				}
 			}

+ 46 - 0
tcp_server.h

@@ -0,0 +1,46 @@
+/*
+ * $Id$
+ *
+ * Copyright (C) 2001-2003 Fhg Fokus
+ *
+ * This file is part of ser, a free SIP server.
+ *
+ * ser is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * For a license to use the ser software under conditions
+ * other than those described here, or to purchase support for this
+ * software, please contact iptel.org by e-mail at the following addresses:
+ *    [email protected]
+ *
+ * ser is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
+
+#ifndef tcp_server_h
+#define tcp_server_h
+
+
+
+
+/* "public" functions*/
+
+struct tcp_connection* tcpconn_get(int id, struct ip_addr* ip, int port);
+void tcpconn_put(struct tcp_connection* c);
+int tcp_send(char* buf, unsigned len, union sockaddr_union* to, int id);
+
+
+
+
+
+
+#endif