Selaa lähdekoodia

websocket: fetch ws connections based on keepalive process index

- get the ones with id that matches the index when doing modulo over the
number of keepalive process in order to spread the load of doing the
ping-pong
Daniel-Constantin Mierla 6 vuotta sitten
vanhempi
commit
71517152e7

+ 2 - 2
src/modules/websocket/websocket.c

@@ -66,7 +66,7 @@ static int ws_keepalive_interval = DEFAULT_KEEPALIVE_INTERVAL;
 static int ws_keepalive_timeout = DEFAULT_KEEPALIVE_TIMEOUT;
 
 #define DEFAULT_KEEPALIVE_PROCESSES 1
-static int ws_keepalive_processes = DEFAULT_KEEPALIVE_PROCESSES;
+int ws_keepalive_processes = DEFAULT_KEEPALIVE_PROCESSES;
 
 int ws_verbose_list = 0;
 
@@ -284,7 +284,7 @@ static int child_init(int rank)
 			&& ws_keepalive_mechanism != KEEPALIVE_MECHANISM_NONE) {
 		for(i = 0; i < ws_keepalive_processes; i++) {
 			if(fork_sync_timer(PROC_TIMER, "WEBSOCKET KEEPALIVE", 1,
-					   ws_keepalive, NULL, ws_keepalive_interval)
+					   ws_keepalive, (void*)(long)i, ws_keepalive_interval)
 					< 0) {
 				LM_ERR("starting keepalive process\n");
 				return -1;

+ 24 - 17
src/modules/websocket/ws_conn.c

@@ -39,6 +39,7 @@
 
 extern int ws_verbose_list;
 extern str ws_event_callback;
+extern int ws_keepalive_processes;
 
 ws_connection_t **wsconn_id_hash = NULL;
 #define wsconn_listadd tcpconn_listadd
@@ -53,7 +54,7 @@ gen_lock_t *wsconn_lock = NULL;
 
 gen_lock_t *wsstat_lock = NULL;
 
-ws_connection_used_list_t *wsconn_used_list = NULL;
+ws_connection_list_t *wsconn_used_list = NULL;
 
 stat_var *ws_current_connections;
 stat_var *ws_max_concurrent_connections;
@@ -103,13 +104,13 @@ int wsconn_init(void)
 	memset((void *)wsconn_id_hash, 0,
 			TCP_ID_HASH_SIZE * sizeof(ws_connection_t *));
 
-	wsconn_used_list = (ws_connection_used_list_t *)shm_malloc(
-			sizeof(ws_connection_used_list_t));
+	wsconn_used_list = (ws_connection_list_t *)shm_malloc(
+			sizeof(ws_connection_list_t));
 	if(wsconn_used_list == NULL) {
 		LM_ERR("allocating WebSocket used list\n");
 		goto error;
 	}
-	memset((void *)wsconn_used_list, 0, sizeof(ws_connection_used_list_t));
+	memset((void *)wsconn_used_list, 0, sizeof(ws_connection_list_t));
 
 	return 0;
 
@@ -573,7 +574,7 @@ int wsconn_put_list(ws_connection_t **list_head)
 }
 
 
-ws_connection_id_t *wsconn_get_list_ids(void)
+ws_connection_id_t *wsconn_get_list_ids(int idx)
 {
 	ws_connection_id_t *list = NULL;
 	ws_connection_t *wsc = NULL;
@@ -589,10 +590,13 @@ ws_connection_id_t *wsconn_get_list_ids(void)
 	/* get the number of used connections */
 	wsc = wsconn_used_list->head;
 	while(wsc) {
-		if(ws_verbose_list)
-			LM_DBG("counter wsc [%p] prev => [%p] next => [%p]\n", wsc,
-					wsc->used_prev, wsc->used_next);
-		list_len++;
+		if(wsc->id % ws_keepalive_processes == idx) {
+			if(ws_verbose_list) {
+				LM_DBG("counter wsc [%p] prev => [%p] next => [%p] (%d/%d)\n",
+						wsc, wsc->used_prev, wsc->used_next, wsc->id, idx);
+			}
+			list_len++;
+		}
 		wsc = wsc->used_next;
 	}
 
@@ -615,11 +619,13 @@ ws_connection_id_t *wsconn_get_list_ids(void)
 			break;
 		}
 
-		list[i].id = wsc->id;
-		wsconn_ref(wsc);
-		if(ws_verbose_list)
-			LM_DBG("wsc [%p] id [%d] ref++\n", wsc, wsc->id);
-
+		if(wsc->id % ws_keepalive_processes == idx) {
+			list[i].id = wsc->id;
+			wsconn_ref(wsc);
+			if(ws_verbose_list) {
+				LM_DBG("wsc [%p] id [%d] (%d) - ref++\n", wsc, wsc->id, idx);
+			}
+		}
 		wsc = wsc->used_next;
 	}
 	list[i].id = -1; /* explicit -1 termination */
@@ -627,10 +633,11 @@ ws_connection_id_t *wsconn_get_list_ids(void)
 end:
 	WSCONN_UNLOCK;
 
-	if(ws_verbose_list)
+	if(ws_verbose_list) {
 		LM_DBG("wsconn get list id returns list [%p]"
-			   " with [%d] members\n",
-				list, (int)list_len);
+			   " with [%d] members (%d)\n",
+				list, (int)list_len, idx);
+	}
 
 	return list;
 }

+ 3 - 3
src/modules/websocket/ws_conn.h

@@ -72,14 +72,14 @@ typedef struct
 {
 	ws_connection_t *head;
 	ws_connection_t *tail;
-} ws_connection_used_list_t;
+} ws_connection_list_t;
 
 typedef enum {
 	WSCONN_EVENTROUTE_NO = 0,
 	WSCONN_EVENTROUTE_YES
 } ws_conn_eventroute_t;
 
-extern ws_connection_used_list_t *wsconn_used_list;
+extern ws_connection_list_t *wsconn_used_list;
 
 extern char *wsconn_state_str[];
 
@@ -100,7 +100,7 @@ ws_connection_t *wsconn_get(int id);
 int wsconn_put(ws_connection_t *wsc);
 ws_connection_t **wsconn_get_list(void);
 int wsconn_put_list(ws_connection_t **list);
-ws_connection_id_t *wsconn_get_list_ids(void);
+ws_connection_id_t *wsconn_get_list_ids(int idx);
 int wsconn_put_list_ids(ws_connection_id_t *list);
 int wsconn_put_id(int id);
 void ws_rpc_dump(rpc_t *rpc, void *ctx);

+ 2 - 1
src/modules/websocket/ws_frame.c

@@ -799,9 +799,10 @@ void ws_keepalive(unsigned int ticks, void *param)
 	ws_connection_id_t *list_head = NULL;
 	ws_connection_t *wsc = NULL;
 	int i = 0;
+	int idx = (int)(long)param;
 
 	/* get an array of pointer to all ws connection */
-	list_head = wsconn_get_list_ids();
+	list_head = wsconn_get_list_ids(idx);
 	if(!list_head)
 		return;