Browse Source

modules/websockets: started adding WebSocket connection management to WebSocket module

Peter Dunkley 13 năm trước cách đây
mục cha
commit
857b117c50

+ 187 - 0
modules/websocket/ws_conn.c

@@ -0,0 +1,187 @@
+/*
+ * $Id$
+ *
+ * Copyright (C) 2012 Crocodile RCS Ltd
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+#include "../../locking.h"
+#include "../../tcp_conn.h"
+#include "../../mem/mem.h"
+#include "ws_conn.h"
+
+struct ws_connection **wsconn_hash = NULL;
+gen_lock_t *wsconn_lock = NULL;
+
+char *wsconn_state_str[] =
+{
+	"CONNECTING",
+	"OPEN",
+	"CLOSING",
+	"CLOSED"
+};
+
+static inline void _wsconn_rm(ws_connection_t *wsc);
+
+int wsconn_init(void)
+{
+	wsconn_lock = lock_alloc();
+	if (wsconn_lock == NULL)
+	{
+		LM_ERR("allocating lock\n");
+		return -1;
+	}
+	if (lock_init(wsconn_lock) == 0)
+	{
+		LM_ERR("initialising lock\n");
+		lock_dealloc((void *) wsconn_lock);
+		wsconn_lock = NULL;
+		return -1;
+	}
+
+	wsconn_hash =
+		(ws_connection_t **) shm_malloc(TCP_ID_HASH_SIZE *
+						sizeof(ws_connection_t));
+	if (wsconn_hash == NULL)
+	{
+		LM_ERR("allocating WebSocket hash-table\n");
+		lock_dealloc((void *) wsconn_lock);
+		wsconn_lock = NULL;
+		return -1;
+	}
+	memset((void *) wsconn_hash, 0,
+		TCP_ID_HASH_SIZE * sizeof(ws_connection_t *));
+
+	return 0;
+}
+
+void wsconn_destroy(void)
+{
+	int h;
+
+	if (wsconn_hash)
+	{
+		WSCONN_UNLOCK;
+		WSCONN_LOCK;
+		for (h = 0; h < TCP_ID_HASH_SIZE; h++)
+		{
+			ws_connection_t *wsc = wsconn_hash[h];
+			while (wsc)
+			{
+				ws_connection_t *next = wsc->next;
+				_wsconn_rm(wsc);
+				wsc = next;
+			}
+		}
+		WSCONN_UNLOCK;
+
+		shm_free(wsconn_hash);
+		wsconn_hash = NULL;
+	}
+
+	if (wsconn_lock)
+	{
+		lock_destroy(wsconn_lock);
+		lock_dealloc((void *) wsconn_lock);
+		wsconn_lock = NULL;
+	}
+}
+
+int wsconn_add(struct tcp_connection *con)
+{
+	ws_connection_t *wsc;
+
+	if (!con)
+	{
+		LM_ERR("wsconn_add: null pointer\n");
+		return -1;
+	}
+
+	wsc = shm_malloc(sizeof(ws_connection_t));
+	if (wsc == NULL)
+	{
+		LM_ERR("allocating shared memory\n");
+		return -1;
+	}
+	memset(wsc, 0, sizeof(ws_connection_t));
+
+	wsc->con = con;
+	wsc->id_hash = con->id_hash;
+	wsc->last_used = (int)time(NULL);
+	wsc->state = WS_S_OPEN;
+
+	/* Make sure Kamailio core sends future messages on this connection
+	   directly to this module */
+	con->flags |= F_CONN_WS;
+
+	WSCONN_LOCK;
+	wsc->next = wsconn_hash[wsc->id_hash];
+	wsc->prev = NULL;
+	if (wsconn_hash[wsc->id_hash]) wsconn_hash[wsc->id_hash]->prev = wsc;
+	wsconn_hash[wsc->id_hash] = wsc;
+	WSCONN_UNLOCK;
+
+	return 0;
+}
+
+static inline void _wsconn_rm(ws_connection_t *wsc)
+{
+	if (wsconn_hash[wsc->id_hash] == wsc)
+		wsconn_hash[wsc->id_hash] = wsc->next;
+	if (wsc->next) wsc->next->prev = wsc->prev;
+	if (wsc->prev) wsc->prev->next = wsc->next;
+	shm_free(wsc);
+	wsc = NULL;
+}
+
+int wsconn_rm(ws_connection_t *wsc)
+{
+	if (!wsc)
+	{
+		LM_ERR("wsconn_rm: null pointer\n");
+		return -1;
+	}
+
+	WSCONN_LOCK;
+	_wsconn_rm(wsc);
+	WSCONN_UNLOCK;
+
+	return 0;
+}
+
+ws_connection_t *wsconn_find(struct tcp_connection *con)
+{
+	ws_connection_t *wsc;
+
+	if (!con)
+	{
+		LM_ERR("wsconn_find: null pointer\n");
+		return NULL;
+	}
+
+	WSCONN_LOCK;
+	for (wsc = wsconn_hash[con->id_hash]; wsc; wsc = wsc->next)
+	{
+		if (wsc->id_hash == con->id_hash)
+			return wsc;
+	}
+	WSCONN_UNLOCK;
+
+	return NULL;
+}

+ 63 - 0
modules/websocket/ws_conn.h

@@ -0,0 +1,63 @@
+/*
+ * $Id$
+ *
+ * Copyright (C) 2012 Crocodile RCS Ltd
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+#ifndef _WS_CONN_H
+#define _WS_CONN_H
+
+#include "../../locking.h"
+#include "../../tcp_conn.h"
+
+typedef enum
+{
+	WS_S_CONNECTING	= 0,
+	WS_S_OPEN,
+	WS_S_CLOSING,
+	WS_S_CLOSED
+} ws_conn_state_t;
+
+typedef struct ws_connection
+{
+	struct tcp_connection *con;
+
+	ws_conn_state_t state;
+	unsigned int id_hash;
+	unsigned int last_used;
+
+	struct ws_connection *prev;
+	struct ws_connection *next;
+} ws_connection_t;
+
+extern ws_connection_t **wsconn_hash;
+extern gen_lock_t *wsconn_lock;
+extern char *wsconn_state_str[];
+
+int wsconn_init(void);
+void wsconn_destroy(void);
+int wsconn_add(struct tcp_connection *con);
+int wsconn_rm(ws_connection_t *wsc);
+ws_connection_t *wsconn_find(struct tcp_connection *con);
+
+#define WSCONN_LOCK	lock_get(wsconn_lock);
+#define WSCONN_UNLOCK	lock_release(wsconn_lock);
+
+#endif /* _WS_CONN_H */

+ 2 - 0
modules/websocket/ws_frame.c

@@ -26,6 +26,8 @@
 #include "../../tcp_server.h"
 #include "../../lib/kcore/kstats_wrapper.h"
 #include "../../lib/kmi/tree.h"
+#include "../../mem/mem.h"
+#include "ws_conn.h"
 #include "ws_frame.h"
 #include "ws_mod.h"
 

+ 4 - 3
modules/websocket/ws_handshake.c

@@ -31,9 +31,11 @@
 #include "../../lib/kcore/kstats_wrapper.h"
 #include "../../lib/kcore/cmpapi.h"
 #include "../../lib/kmi/tree.h"
+#include "../../mem/mem.h"
 #include "../../parser/msg_parser.h"
 #include "../sl/sl.h"
 #include "../tls/tls_cfg.h"
+#include "ws_conn.h"
 #include "ws_handshake.h"
 #include "ws_mod.h"
 
@@ -308,9 +310,8 @@ int ws_handle_handshake(struct sip_msg *msg)
 				&str_status_switching_protocols, &headers) < 0)
 		return 0;
 
-	/* Make sure Kamailio core sends future requests on this connection
-	   directly to this module */
-	con->flags |= F_CONN_WS;
+	/* Add the connection to the WebSocket connection table */
+	wsconn_add(con);
 
 	return 0;
 }

+ 54 - 30
modules/websocket/ws_mod.c

@@ -30,7 +30,9 @@
 #include "../../lib/kcore/kstats_wrapper.h"
 #include "../../lib/kmi/mi.h"
 #include "../../lib/kmi/tree.h"
+#include "../../mem/mem.h"
 #include "../../parser/msg_parser.h"
+#include "ws_conn.h"
 #include "ws_handshake.h"
 #include "ws_frame.h"
 #include "ws_mod.h"
@@ -121,111 +123,133 @@ static int mod_init(void)
 	if (sl_load_api(&ws_slb) != 0)
 	{
 		LM_ERR("binding to SL\n");
-		return -1;
+		goto error;
 	}
 
 	if (sr_event_register_cb(SREV_TCP_WS_FRAME, ws_frame_received) != 0)
 	{
 		LM_ERR("registering WebSocket call-back\n");
-		return -1;
+		goto error;
 	}
 
 	if (register_module_stats(exports.name, stats) != 0)
 	{
 		LM_ERR("registering core statistics\n");
-		return -1;
+		goto error;
 	}
 
 	if (register_mi_mod(exports.name, mi_cmds) != 0)
 	{
 		LM_ERR("registering MI commands\n");
-		return -1;
+		goto error;
+	}
+
+	if (wsconn_init() < 0)
+	{
+		LM_ERR("initialising WebSocket connections table\n");
+		goto error;
 	}
 
 	if ((ws_enabled = (int *) shm_malloc(sizeof(int))) == NULL)
 	{
 		LM_ERR("allocating shared memory\n");
-		return -1;
+		goto error;
 	}
 	*ws_enabled = 1;
 
+	if (wsconn_init() < 0)
+	{
+		LM_ERR("initialising WebSocket connections table\n");
+		goto error;
+	}
+
 	if ((ws_stats_lock = lock_alloc()) == NULL)
 	{
 		LM_ERR("allocating lock\n");
-		return -1;
+		goto error;
 	}
 	if (lock_init(ws_stats_lock) == NULL)
 	{
 		LM_ERR("initialising lock\n");
-		lock_dealloc(ws_stats_lock);
-		return -1;
+		goto error;
 	}
 
-	/* TODO: register module with core to receive WS/WSS messages */
-
 	return 0;
+
+error:
+	wsconn_destroy();
+
+	if (ws_stats_lock)
+		lock_dealloc(ws_stats_lock);
+
+	shm_free(ws_enabled);
+
+	return -1;
 }
 
 static void destroy(void)
 {
+	wsconn_destroy();
 	shm_free(ws_enabled);
 	lock_destroy(ws_stats_lock);
 	lock_dealloc(ws_stats_lock);
-
-	/* TODO: close all connections */
 }
 
 static struct mi_root *mi_dump(struct mi_root *cmd, void *param)
 {
-	int h, connections = 0;
+	int h, connections = 0, interval;
 	char *src_proto, *dst_proto;
 	char src_ip[IP6_MAX_STR_SIZE + 1], dst_ip[IP6_MAX_STR_SIZE + 1];
-	struct tcp_connection *c;
+	ws_connection_t *wsc;
 	struct mi_root *rpl_tree = init_mi_tree(200, MI_OK_S, MI_OK_LEN);
 
 	if (!rpl_tree)
 		return 0;
 
-	TCPCONN_LOCK;
+	WSCONN_LOCK;
 	for (h = 0; h < TCP_ID_HASH_SIZE; h++)
 	{
-		c = tcpconn_id_hash[h];
-		while(c)
+		wsc = wsconn_hash[h];
+		while(wsc)
 		{
-			if (c->flags & F_CONN_WS)
+			if (wsc->con)
 			{
-				src_proto = (c->rcv.proto== PROTO_TCP)
+				src_proto = (wsc->con->rcv.proto== PROTO_TCP)
 						? "tcp" : "tls";
 				memset(src_ip, 0, IP6_MAX_STR_SIZE + 1);
-				ip_addr2sbuf(&c->rcv.src_ip, src_ip,
+				ip_addr2sbuf(&wsc->con->rcv.src_ip, src_ip,
 						IP6_MAX_STR_SIZE);
 
-				dst_proto = (c->rcv.proto == PROTO_TCP)
+				dst_proto = (wsc->con->rcv.proto == PROTO_TCP)
 						? "tcp" : "tls";
 				memset(dst_ip, 0, IP6_MAX_STR_SIZE + 1);
-				ip_addr2sbuf(&c->rcv.dst_ip, src_ip,
+				ip_addr2sbuf(&wsc->con->rcv.dst_ip, src_ip,
 						IP6_MAX_STR_SIZE);
 
+				interval = (int)time(NULL) - wsc->last_used;
+
 				if (addf_mi_node_child(&rpl_tree->node, 0, 0, 0,
-						"id - %d, "
-						"src - %s:%s:%hu, "
-						"dst - %s:%s:%hu",
-						c->id,
+						"%d: %s:%s:%hu -> %s:%s:%hu "
+						"(state: %s, "
+						"last used %ds ago)",
+						wsc->con->id,
 						src_proto,
 						strlen(src_ip) ? src_ip : "*",
-						c->rcv.src_port,
+						wsc->con->rcv.src_port,
 						dst_proto,
 						strlen(dst_ip) ? dst_ip : "*",
-						c->rcv.dst_port) == 0)
+						wsc->con->rcv.dst_port,
+						wsconn_state_str[wsc->state],
+						interval) == 0)
 					return 0;
 
 				connections++;
 			}
 
-			c = c->id_next;
+			wsc = wsc->next;
 		}
 	}
-	TCPCONN_UNLOCK;
+	WSCONN_UNLOCK;
 
 	if (addf_mi_node_child(&rpl_tree->node, 0, 0, 0,
 				"%d WebSocket connection%s found",