Przeglądaj źródła

websocket: destroy ws connections in dedicated timer process

- avoid races/invalid access when sending data failed and the connection
was removed
- added parameter to control the timer process: interval or running and
delay interval for removing
Daniel-Constantin Mierla 6 lat temu
rodzic
commit
af09e224db

+ 31 - 7
src/modules/websocket/websocket.c

@@ -60,6 +60,12 @@ static int ws_init_rpc(void);
 
 sl_api_t ws_slb;
 
+#define WS_DEFAULT_RM_DELAY_INTERVAL 5
+static int ws_rm_delay_interval = WS_DEFAULT_RM_DELAY_INTERVAL;
+
+#define DEFAULT_TIMER_INTERVAL 1
+static int ws_timer_interval = DEFAULT_TIMER_INTERVAL;
+
 #define DEFAULT_KEEPALIVE_INTERVAL 1
 static int ws_keepalive_interval = DEFAULT_KEEPALIVE_INTERVAL;
 
@@ -107,6 +113,9 @@ static param_export_t params[] = {
 	{ "keepalive_interval",		INT_PARAM, &ws_keepalive_interval },
 	{ "keepalive_processes",	INT_PARAM, &ws_keepalive_processes },
 
+	{ "timer_interval",		INT_PARAM, &ws_timer_interval },
+	{ "rm_delay_interval",	INT_PARAM, &ws_rm_delay_interval },
+
 	{ "verbose_list",		PARAM_INT, &ws_verbose_list },
 	{ "event_callback",		PARAM_STR, &ws_event_callback},
 
@@ -227,6 +236,13 @@ static int mod_init(void)
 		/* Add extra process/timer for the keepalive process */
 		register_sync_timers(ws_keepalive_processes);
 	}
+	if(ws_timer_interval < 1 || ws_timer_interval > 60)
+		ws_timer_interval = DEFAULT_TIMER_INTERVAL;
+	/* timer routing to clean up inactive connections */
+	register_sync_timers(1);
+
+	if(ws_rm_delay_interval < 1 || ws_rm_delay_interval > 60)
+		ws_rm_delay_interval = WS_DEFAULT_RM_DELAY_INTERVAL;
 
 	if(ws_sub_protocols & SUB_PROTOCOL_MSRP
 			&& !sr_event_enabled(SREV_TCP_MSRP_FRAME))
@@ -280,16 +296,24 @@ static int child_init(int rank)
 	if(rank == PROC_INIT || rank == PROC_TCP_MAIN)
 		return 0;
 
-	if(rank == PROC_MAIN
-			&& ws_keepalive_mechanism != KEEPALIVE_MECHANISM_NONE) {
-		for(i = 0; i < ws_keepalive_processes; i++) {
-			if(fork_sync_timer(PROC_TIMER, "WEBSOCKET KEEPALIVE", 1,
-					   ws_keepalive, (void*)(long)i, ws_keepalive_interval)
+	if(rank == PROC_MAIN) {
+		if(ws_keepalive_mechanism != KEEPALIVE_MECHANISM_NONE) {
+			for(i = 0; i < ws_keepalive_processes; i++) {
+				if(fork_sync_timer(PROC_TIMER, "WEBSOCKET KEEPALIVE", 1,
+						   ws_keepalive, (void*)(long)i, ws_keepalive_interval)
+						< 0) {
+					LM_ERR("starting keepalive process\n");
+					return -1;
+				}
+			}
+		}
+		if(fork_sync_timer(PROC_TIMER, "WEBSOCKET TIMER", 1,
+			   ws_timer, NULL, ws_timer_interval)
 					< 0) {
-				LM_ERR("starting keepalive process\n");
+				LM_ERR("starting timer process\n");
 				return -1;
-			}
 		}
+
 	}
 
 	return 0;

+ 68 - 32
src/modules/websocket/ws_conn.c

@@ -40,6 +40,7 @@
 extern int ws_verbose_list;
 extern str ws_event_callback;
 extern int ws_keepalive_processes;
+extern int ws_rm_delay_interval;
 
 ws_connection_t **wsconn_id_hash = NULL;
 #define wsconn_listadd tcpconn_listadd
@@ -383,54 +384,56 @@ void wsconn_close_now(ws_connection_t *wsc)
 	con->timeout = get_ticks_raw();
 }
 
+void wsconn_detach_connection(ws_connection_t *wsc)
+{
+	/* Remove from the WebSocket used list */
+	if(wsconn_used_list->head == wsc)
+		wsconn_used_list->head = wsc->used_next;
+	if(wsconn_used_list->tail == wsc)
+		wsconn_used_list->tail = wsc->used_prev;
+	if(wsc->used_prev)
+		wsc->used_prev->used_next = wsc->used_next;
+	if(wsc->used_next)
+		wsc->used_next->used_prev = wsc->used_prev;
+
+	/* remove from wsconn_id_hash */
+	wsconn_listrm(wsconn_id_hash[wsc->id_hash], wsc, id_next, id_prev);
+
+	/* stat */
+	update_stat(ws_current_connections, -1);
+	if(wsc->sub_protocol == SUB_PROTOCOL_SIP)
+		update_stat(ws_sip_current_connections, -1);
+	else if(wsc->sub_protocol == SUB_PROTOCOL_MSRP)
+		update_stat(ws_msrp_current_connections, -1);
+}
+
 /* mode controls if lock needs to be aquired */
 int wsconn_put_mode(ws_connection_t *wsc, int mode)
 {
-	int destroy = 0;
+	if(!wsc)
+		return -1;
 
 	LM_DBG("wsconn_put start for [%p] refcnt [%d]\n", wsc,
 			atomic_get(&wsc->refcnt));
 
-	if(!wsc)
-		return -1;
-
 	if(mode) {
 		WSCONN_LOCK;
 	}
+	if(wsc->state == WS_S_REMOVING) {
+		goto done;
+	}
 	/* refcnt == 0*/
 	if(wsconn_unref(wsc)) {
-		/* Remove from the WebSocket used list */
-		if(wsconn_used_list->head == wsc)
-			wsconn_used_list->head = wsc->used_next;
-		if(wsconn_used_list->tail == wsc)
-			wsconn_used_list->tail = wsc->used_prev;
-		if(wsc->used_prev)
-			wsc->used_prev->used_next = wsc->used_next;
-		if(wsc->used_next)
-			wsc->used_next->used_prev = wsc->used_prev;
-
-		/* remove from wsconn_id_hash */
-		wsconn_listrm(wsconn_id_hash[wsc->id_hash], wsc, id_next, id_prev);
-
-		/* stat */
-		update_stat(ws_current_connections, -1);
-		if(wsc->sub_protocol == SUB_PROTOCOL_SIP)
-			update_stat(ws_sip_current_connections, -1);
-		else if(wsc->sub_protocol == SUB_PROTOCOL_MSRP)
-			update_stat(ws_msrp_current_connections, -1);
-
-		destroy = 1;
+		wsc->state = WS_S_REMOVING;
+		wsc->rmticks = get_ticks();
 	}
-	if(mode) {
-		WSCONN_UNLOCK;
-	}
-
 	LM_DBG("wsconn_put end for [%p] refcnt [%d]\n", wsc,
 			atomic_get(&wsc->refcnt));
 
-	/* wsc is removed from all lists and can be destroyed safely */
-	if(destroy)
-		wsconn_dtor(wsc);
+done:
+	if(mode) {
+		WSCONN_UNLOCK;
+	}
 
 	return 0;
 }
@@ -662,6 +665,39 @@ int wsconn_put_list_ids(ws_connection_id_t *list_head)
 	return 0;
 }
 
+void ws_timer(unsigned int ticks, void *param)
+{
+	ws_connection_list_t rmlist;
+	ws_connection_t *wsc;
+	ws_connection_t *next;
+	ticks_t nticks;
+	int h;
+
+	rmlist.head = NULL;
+	nticks = get_ticks();
+
+	WSCONN_LOCK;
+	for(h = 0; h < TCP_ID_HASH_SIZE; h++) {
+		wsc = wsconn_id_hash[h];
+		while(wsc) {
+			next = wsc->id_next;
+			if(wsc->state == WS_S_REMOVING
+					&& wsc->rmticks <= nticks - ws_rm_delay_interval) {
+				wsconn_detach_connection(wsc);
+				wsc->id_next = rmlist.head;
+				rmlist.head = wsc;
+			}
+			wsc = next;
+		}
+	}
+	WSCONN_UNLOCK;
+
+	for(wsc = rmlist.head; wsc; ) {
+		next = wsc->id_next;
+		wsconn_dtor(wsc);
+		wsc = next;
+	}
+}
 
 static int ws_rpc_add_node(
 		rpc_t *rpc, void *ctx, void *ih, ws_connection_t *wsc)

+ 4 - 0
src/modules/websocket/ws_conn.h

@@ -31,11 +31,13 @@
 
 #include "../../core/counters.h"
 #include "../../core/rpc.h"
+#include "../../core/timer.h"
 
 typedef enum {
 	WS_S_CONNECTING = 0, /* Never used - included for completeness */
 	WS_S_OPEN,
 	WS_S_CLOSING,
+	WS_S_REMOVING,
 	WS_S_CLOSED /* Never used - included for completeness */
 } ws_conn_state_t;
 
@@ -43,6 +45,7 @@ typedef struct ws_connection
 {
 	ws_conn_state_t state;
 	int awaiting_pong;
+	ticks_t rmticks;
 
 	int last_used;
 	struct ws_connection *used_prev;
@@ -104,4 +107,5 @@ ws_connection_id_t *wsconn_get_list_ids(int idx);
 int wsconn_put_list_ids(ws_connection_id_t *list);
 int wsconn_put_id(int id);
 void ws_rpc_dump(rpc_t *rpc, void *ctx);
+void ws_timer(unsigned int ticks, void *param);
 #endif /* _WS_CONN_H */