Przeglądaj źródła

core: udp - option to group multithreaded receivers by group

- set same agname for socket structures
- udp_receiver_mode has to be set to 2
Daniel-Constantin Mierla 10 miesięcy temu
rodzic
commit
de99a82680
3 zmienionych plików z 115 dodań i 11 usunięć
  1. 28 4
      src/core/udp_server.c
  2. 1 1
      src/core/udp_server.h
  3. 86 6
      src/main.c

+ 28 - 4
src/core/udp_server.c

@@ -1025,6 +1025,10 @@ void *ksr_udp_mtworker(void *si)
 	rcvi.dst_ip = tsock->address;
 	rcvi.proto = PROTO_UDP;
 
+	if(tsock->agroup.agname[0] != '\0') {
+		gname.s = tsock->agroup.agname;
+		gname.len = strlen(gname.s);
+	}
 	awg = async_task_group_find(&gname);
 
 	while(1) {
@@ -1063,10 +1067,16 @@ void *ksr_udp_mtworker(void *si)
 		rcvi.src_port = su_getport(fromaddr);
 
 		if(awg == NULL) {
+			if(tsock->agroup.agname[0] != '\0') {
+				gname.s = tsock->agroup.agname;
+				gname.len = strlen(gname.s);
+			}
 			awg = async_task_group_find(&gname);
 		}
-		if(awg == NULL) {
+		if(awg != NULL) {
 			udpworker_task_send(awg, buf, len, &rcvi);
+		} else {
+			LM_WARN("workers group [%s] not found\n", gname.s);
 		}
 	}
 }
@@ -1074,7 +1084,7 @@ void *ksr_udp_mtworker(void *si)
 /**
  *
  */
-int ksr_udp_start_mtreceiver(int child_rank, int *woneinit)
+int ksr_udp_start_mtreceiver(int child_rank, char *agname, int *woneinit)
 {
 	socket_info_t *si;
 	pthread_t *udpthreads = NULL;
@@ -1082,12 +1092,16 @@ int ksr_udp_start_mtreceiver(int child_rank, int *woneinit)
 	int rc = 0;
 	int i = 0;
 	int pid;
+	char si_desc[MAX_PT_DESC];
 
 	if(udp_listen == NULL) {
 		return 0;
 	}
 
-	pid = fork_process(child_rank, "UDP MULTITREADED RECEIVER", 1);
+	snprintf(si_desc, MAX_PT_DESC, "udp multithreaded receiver (%s)",
+			(agname) ? agname : "udp");
+
+	pid = fork_process(child_rank, si_desc, 1);
 	if(pid < 0) {
 		LM_CRIT("cannot fork\n");
 		goto error;
@@ -1109,7 +1123,12 @@ int ksr_udp_start_mtreceiver(int child_rank, int *woneinit)
 		}
 		/* udp workers */
 		for(si = udp_listen; si; si = si->next) {
-			nrthreads++;
+			if(agname == NULL) {
+				nrthreads++;
+			} else if((si->agroup.agname[0] != '\0')
+					  && (strcmp(agname, si->agroup.agname) == 0)) {
+				nrthreads++;
+			}
 		}
 		udpthreads = (pthread_t *)malloc(nrthreads * sizeof(pthread_t));
 		if(udpthreads == NULL) {
@@ -1119,6 +1138,11 @@ int ksr_udp_start_mtreceiver(int child_rank, int *woneinit)
 		memset(udpthreads, 0, nrthreads * sizeof(pthread_t));
 		i = 0;
 		for(si = udp_listen; si; si = si->next) {
+			if(!((agname == NULL)
+					   || ((si->agroup.agname[0] != '\0')
+							   && (strcmp(agname, si->agroup.agname) == 0)))) {
+				continue;
+			}
 			LM_DBG("creating udp thread worker[%d] [%.*s]\n", i,
 					si->sock_str.len, si->sock_str.s);
 			rc = pthread_create(

+ 1 - 1
src/core/udp_server.h

@@ -40,6 +40,6 @@ int udp_init(struct socket_info *si);
 int udp_send(struct dest_info *dst, char *buf, unsigned len);
 int udp_rcv_loop(void);
 
-int ksr_udp_start_mtreceiver(int child_rank, int *woneinit);
+int ksr_udp_start_mtreceiver(int child_rank, char *agname, int *woneinit);
 
 #endif

+ 86 - 6
src/main.c

@@ -504,6 +504,7 @@ int ser_kill_timeout = DEFAULT_SER_KILL_TIMEOUT;
 int ksr_verbose_startup = 0;
 int ksr_all_errors = 0;
 int ksr_udp_receiver_mode = 0;
+int ksr_udp_mtreceivers = 0;
 
 /* cfg parsing */
 int cfg_errors = 0;
@@ -1374,12 +1375,14 @@ int main_loop(void)
 	int i;
 	pid_t pid;
 	struct socket_info *si;
+	struct socket_info *sx;
 	char si_desc[MAX_PT_DESC];
 #ifdef EXTRA_DEBUG
 	int r;
 #endif
 	int nrprocs;
 	int woneinit;
+	int agfound = 0;
 
 	if(_sr_instance_started == NULL) {
 		_sr_instance_started = shm_malloc(sizeof(int));
@@ -1591,11 +1594,34 @@ int main_loop(void)
 				/* children_no per each socket */
 				cfg_register_child(
 						(si->workers > 0) ? si->workers : children_no);
+			} else if(ksr_udp_receiver_mode == 2) {
+				if(si->agroup.agname[0] != '\0') {
+					agfound = 0;
+					for(sx = udp_listen; sx != si; sx = sx->next) {
+						if(sx->agroup.agname[0] != '\0') {
+							if(strcmp(sx->agroup.agname, si->agroup.agname)
+									== 0) {
+								agfound = 1;
+								break;
+							}
+						}
+					}
+					if(agfound == 0) {
+						/* one udp multi-threaded worker */
+						cfg_register_child(1);
+						ksr_udp_mtreceivers++;
+					}
+				} else {
+					/* children_no per each socket */
+					cfg_register_child(
+							(si->workers > 0) ? si->workers : children_no);
+				}
 			}
 		}
 		if(udp_listen && (ksr_udp_receiver_mode == 1)) {
 			/* main udp multi-threaded worker */
 			cfg_register_child(1);
+			ksr_udp_mtreceivers++;
 		}
 
 #ifdef USE_RAW_SOCKS
@@ -1780,13 +1806,44 @@ int main_loop(void)
 		}
 		if(udp_listen && (ksr_udp_receiver_mode == 1)) {
 			child_rank++;
-			if(ksr_udp_start_mtreceiver(child_rank, &woneinit) < 0) {
+			if(ksr_udp_start_mtreceiver(child_rank, NULL, &woneinit) < 0) {
 				goto error;
 			}
 		}
+		if(udp_listen && (ksr_udp_receiver_mode == 2)) {
+			for(si = udp_listen; si; si = si->next) {
+				if(si->agroup.agname[0] == '\0') {
+					continue;
+				}
+				agfound = 0;
+				for(sx = udp_listen; sx != si; sx = sx->next) {
+					if(sx->agroup.agname[0] != '\0') {
+						if(strcmp(sx->agroup.agname, si->agroup.agname) == 0) {
+							agfound = 1;
+							break;
+						}
+					}
+				}
+				if(agfound == 0) {
+					child_rank++;
+					if(ksr_udp_start_mtreceiver(
+							   child_rank, si->agroup.agname, &woneinit)
+							< 0) {
+						goto error;
+					}
+				}
+			}
+		}
 		/* udp processes */
-		for(si = udp_listen; si && (ksr_udp_receiver_mode == 0);
-				si = si->next) {
+		if(ksr_udp_receiver_mode == 0 || ksr_udp_receiver_mode == 2) {
+			agfound = 1;
+		} else {
+			agfound = 0;
+		}
+		for(si = udp_listen; si && (agfound == 1); si = si->next) {
+			if((ksr_udp_receiver_mode == 2) && (si->agroup.agname[0] != '\0')) {
+				continue;
+			}
 			nrprocs = (si->workers > 0) ? si->workers : children_no;
 			for(i = 0; i < nrprocs; i++) {
 				if(si->address.af == AF_INET6) {
@@ -2038,8 +2095,10 @@ error:
  */
 static int calc_proc_no(void)
 {
-	int udp_listeners;
+	int udp_listeners = 0;
 	struct socket_info *si;
+	struct socket_info *sx;
+	int agfound;
 #ifdef USE_TCP
 	int tcp_listeners;
 	int tcp_e_listeners;
@@ -2050,9 +2109,30 @@ static int calc_proc_no(void)
 
 	if(ksr_udp_receiver_mode == 1) {
 		udp_listeners = 1;
+	} else if(ksr_udp_receiver_mode == 2) {
+		for(si = udp_listen; si; si = si->next) {
+			if(si->agroup.agname[0] == '\0') {
+				udp_listeners += (si->workers > 0) ? si->workers : children_no;
+			} else {
+				agfound = 0;
+				for(sx = udp_listen; sx != si; sx = sx->next) {
+					if(sx->agroup.agname[0] != '\0') {
+						if(strcmp(sx->agroup.agname, si->agroup.agname) == 0) {
+							agfound = 1;
+							break;
+						}
+					}
+				}
+				if(agfound == 0) {
+					udp_listeners += 1;
+				}
+			}
+		}
+		udp_listeners += ksr_udp_mtreceivers;
 	} else {
-		for(si = udp_listen, udp_listeners = 0; si; si = si->next)
+		for(si = udp_listen; si; si = si->next) {
 			udp_listeners += (si->workers > 0) ? si->workers : children_no;
+		}
 	}
 #ifdef USE_TCP
 	for(si = tcp_listen, tcp_listeners = 0, tcp_e_listeners = 0; si;
@@ -2955,7 +3035,7 @@ int main(int argc, char **argv)
 		}
 	}
 
-	if(ksr_udp_receiver_mode != 1) {
+	if(ksr_udp_receiver_mode != 1 && ksr_udp_receiver_mode != 2) {
 		ksr_udp_receiver_mode = 0;
 	}