Pārlūkot izejas kodu

modules/websocket: Closing handshake now working

- Also completed MI commands which allowed me to test Ping and Pong
Peter Dunkley 13 gadi atpakaļ
vecāks
revīzija
2f30521ea9

+ 154 - 17
modules/websocket/ws_conn.c

@@ -22,12 +22,23 @@
  */
 
 #include "../../locking.h"
+#include "../../str.h"
 #include "../../tcp_conn.h"
+#include "../../lib/kcore/kstats_wrapper.h"
+#include "../../lib/kmi/tree.h"
 #include "../../mem/mem.h"
 #include "ws_conn.h"
+#include "ws_mod.h"
+
+/* Maximum number of connections to display when using the ws.dump MI command */
+#define MAX_WS_CONNS_DUMP	50
 
 struct ws_connection **wsconn_hash = NULL;
 gen_lock_t *wsconn_lock = NULL;
+gen_lock_t *wsstat_lock = NULL;
+
+stat_var *ws_current_connections;
+stat_var *ws_max_concurrent_connections;
 
 char *wsconn_state_str[] =
 {
@@ -45,14 +56,24 @@ int wsconn_init(void)
 	if (wsconn_lock == NULL)
 	{
 		LM_ERR("allocating lock\n");
-		return -1;
+		goto error;
 	}
 	if (lock_init(wsconn_lock) == 0)
 	{
 		LM_ERR("initialising lock\n");
-		lock_dealloc((void *) wsconn_lock);
-		wsconn_lock = NULL;
-		return -1;
+		goto error;
+	}
+
+	wsstat_lock = lock_alloc();
+	if (wsstat_lock == NULL)
+	{
+		LM_ERR("allocating lock\n");
+		goto error;
+	}
+	if (lock_init(wsstat_lock) == NULL)
+	{
+		LM_ERR("initialising lock\n");
+		goto error;
 	}
 
 	wsconn_hash =
@@ -61,14 +82,19 @@ int wsconn_init(void)
 	if (wsconn_hash == NULL)
 	{
 		LM_ERR("allocating WebSocket hash-table\n");
-		lock_dealloc((void *) wsconn_lock);
-		wsconn_lock = NULL;
-		return -1;
+		goto error;
 	}
 	memset((void *) wsconn_hash, 0,
 		TCP_ID_HASH_SIZE * sizeof(ws_connection_t *));
 
 	return 0;
+
+error:
+	if (wsconn_lock) lock_dealloc((void *) wsconn_lock);
+	if (wsstat_lock) lock_dealloc((void *) wsstat_lock);
+	wsconn_lock = wsstat_lock = NULL;
+
+	return -1;
 }
 
 void wsconn_destroy(void)
@@ -77,8 +103,8 @@ void wsconn_destroy(void)
 
 	if (wsconn_hash)
 	{
-		WSCONN_UNLOCK;
-		WSCONN_LOCK;
+		lock_release(wsconn_lock);
+		lock_get(wsconn_lock);
 		for (h = 0; h < TCP_ID_HASH_SIZE; h++)
 		{
 			ws_connection_t *wsc = wsconn_hash[h];
@@ -89,7 +115,7 @@ void wsconn_destroy(void)
 				wsc = next;
 			}
 		}
-		WSCONN_UNLOCK;
+		lock_release(wsconn_lock);
 
 		shm_free(wsconn_hash);
 		wsconn_hash = NULL;
@@ -101,10 +127,18 @@ void wsconn_destroy(void)
 		lock_dealloc((void *) wsconn_lock);
 		wsconn_lock = NULL;
 	}
+
+	if (wsstat_lock)
+	{
+		lock_destroy(wsstat_lock);
+		lock_dealloc((void *) wsstat_lock);
+		wsstat_lock = NULL;
+	}
 }
 
 int wsconn_add(struct tcp_connection *con)
 {
+	int cur_cons, max_cons;
 	ws_connection_t *wsc;
 
 	if (!con)
@@ -130,12 +164,21 @@ int wsconn_add(struct tcp_connection *con)
 	   directly to this module */
 	con->flags |= F_CONN_WS;
 
-	WSCONN_LOCK;
+	lock_get(wsconn_lock);
 	wsc->next = wsconn_hash[wsc->id_hash];
 	wsc->prev = NULL;
 	if (wsconn_hash[wsc->id_hash]) wsconn_hash[wsc->id_hash]->prev = wsc;
 	wsconn_hash[wsc->id_hash] = wsc;
-	WSCONN_UNLOCK;
+	lock_release(wsconn_lock);
+
+	/* Update connection statistics */
+	lock_get(wsstat_lock);
+	update_stat(ws_current_connections, 1);
+	cur_cons = get_stat_val(ws_current_connections);
+	max_cons = get_stat_val(ws_max_concurrent_connections);
+	if (max_cons < cur_cons)
+		update_stat(ws_max_concurrent_connections, cur_cons - max_cons);
+	lock_release(wsstat_lock);
 
 	return 0;
 }
@@ -148,6 +191,7 @@ static inline void _wsconn_rm(ws_connection_t *wsc)
 	if (wsc->prev) wsc->prev->next = wsc->next;
 	shm_free(wsc);
 	wsc = NULL;
+	update_stat(ws_current_connections, -1);
 }
 
 int wsconn_rm(ws_connection_t *wsc)
@@ -158,13 +202,34 @@ int wsconn_rm(ws_connection_t *wsc)
 		return -1;
 	}
 
-	WSCONN_LOCK;
+	lock_get(wsconn_lock);
 	_wsconn_rm(wsc);
-	WSCONN_UNLOCK;
+	lock_release(wsconn_lock);
+
+	return 0;
+}
+
+int wsconn_update(ws_connection_t *wsc)
+{
+	if (!wsc)
+	{
+		LM_ERR("wsconn_rm: null pointer\n");
+		return -1;
+	}
 
+	wsc->last_used = (int) time(NULL);
 	return 0;
 }
 
+void wsconn_close_now(ws_connection_t *wsc)
+{
+	wsc->con->send_flags.f |= SND_F_CON_CLOSE;
+	wsc->con->state = S_CONN_BAD;
+	wsc->con->timeout = get_ticks_raw();
+	if (wsconn_rm(wsc) < 0)
+		LM_ERR("removing WebSocket connection\n");
+}
+
 ws_connection_t *wsconn_find(struct tcp_connection *con)
 {
 	ws_connection_t *wsc;
@@ -175,13 +240,85 @@ ws_connection_t *wsconn_find(struct tcp_connection *con)
 		return NULL;
 	}
 
-	WSCONN_LOCK;
+	lock_get(wsconn_lock);
 	for (wsc = wsconn_hash[con->id_hash]; wsc; wsc = wsc->next)
 	{
-		if (wsc->id_hash == con->id_hash)
+		if (wsc->con->id == con->id)
+		{
+			lock_release(wsconn_lock);
 			return wsc;
+		}
 	}
-	WSCONN_UNLOCK;
 
+	lock_release(wsconn_lock);
 	return NULL;
 }
+
+struct mi_root *ws_mi_dump(struct mi_root *cmd, void *param)
+{
+	int h, connections = 0, truncated = 0, interval;
+	char *src_proto, *dst_proto;
+	char src_ip[IP6_MAX_STR_SIZE + 1], dst_ip[IP6_MAX_STR_SIZE + 1];
+	ws_connection_t *wsc;
+	struct mi_root *rpl_tree = init_mi_tree(200, MI_OK_S, MI_OK_LEN);
+
+	if (!rpl_tree)
+		return 0;
+
+	lock_get(wsconn_lock);
+	for (h = 0; h < TCP_ID_HASH_SIZE; h++)
+	{
+		wsc = wsconn_hash[h];
+		while(wsc)
+		{
+			if (wsc->con)
+			{
+				src_proto = (wsc->con->rcv.proto== PROTO_TCP)
+						? "tcp" : "tls";
+				memset(src_ip, 0, IP6_MAX_STR_SIZE + 1);
+				ip_addr2sbuf(&wsc->con->rcv.src_ip, src_ip,
+						IP6_MAX_STR_SIZE);
+
+				dst_proto = (wsc->con->rcv.proto == PROTO_TCP)
+						? "tcp" : "tls";
+				memset(dst_ip, 0, IP6_MAX_STR_SIZE + 1);
+				ip_addr2sbuf(&wsc->con->rcv.dst_ip, src_ip,
+						IP6_MAX_STR_SIZE);
+
+				interval = (int)time(NULL) - wsc->last_used;
+
+				if (addf_mi_node_child(&rpl_tree->node, 0, 0, 0,
+						"%d: %s:%s:%hu -> %s:%s:%hu "
+						"(state: %s, "
+						"last used %ds ago)",
+						wsc->con->id,
+						src_proto,
+						strlen(src_ip) ? src_ip : "*",
+						wsc->con->rcv.src_port,
+						dst_proto,
+						strlen(dst_ip) ? dst_ip : "*",
+						wsc->con->rcv.dst_port,
+						wsconn_state_str[wsc->state],
+						interval) == 0)
+					return 0;
+
+				if (++connections == MAX_WS_CONNS_DUMP)
+				{
+					truncated = 1;
+					break;
+				}
+			}
+
+			wsc = wsc->next;
+		}
+	}
+	lock_release(wsconn_lock);
+
+	if (addf_mi_node_child(&rpl_tree->node, 0, 0, 0,
+				"%d WebSocket connection%s found%s",
+				connections, connections == 1 ? "" : "s",
+				truncated == 1 ? "(truncated)" : "") == 0)
+		return 0;
+
+	return rpl_tree;
+}

+ 12 - 9
modules/websocket/ws_conn.h

@@ -26,13 +26,14 @@
 
 #include "../../locking.h"
 #include "../../tcp_conn.h"
+#include "../../lib/kmi/tree.h"
 
 typedef enum
 {
-	WS_S_CONNECTING	= 0,
+	WS_S_CONNECTING	= 0,	/* Never used - included for completeness */
 	WS_S_OPEN,
 	WS_S_CLOSING,
-	WS_S_CLOSED
+	WS_S_CLOSED		/* Never used - included for completeness */
 } ws_conn_state_t;
 
 typedef struct ws_connection
@@ -40,24 +41,26 @@ typedef struct ws_connection
 	struct tcp_connection *con;
 
 	ws_conn_state_t state;
-	unsigned int id_hash;
-	unsigned int last_used;
+	int id;
+	unsigned id_hash;
+	int last_used;
 
 	struct ws_connection *prev;
 	struct ws_connection *next;
 } ws_connection_t;
 
-extern ws_connection_t **wsconn_hash;
-extern gen_lock_t *wsconn_lock;
 extern char *wsconn_state_str[];
 
+extern stat_var *ws_current_connections;
+extern stat_var *ws_max_concurrent_connections;
+
 int wsconn_init(void);
 void wsconn_destroy(void);
 int wsconn_add(struct tcp_connection *con);
 int wsconn_rm(ws_connection_t *wsc);
+int wsconn_update(ws_connection_t *wsc);
+void wsconn_close_now(ws_connection_t *wsc);
 ws_connection_t *wsconn_find(struct tcp_connection *con);
-
-#define WSCONN_LOCK	lock_get(wsconn_lock);
-#define WSCONN_UNLOCK	lock_release(wsconn_lock);
+struct mi_root *ws_mi_dump(struct mi_root *cmd, void *param);
 
 #endif /* _WS_CONN_H */

+ 236 - 57
modules/websocket/ws_frame.c

@@ -22,6 +22,7 @@
  */
 
 #include <limits.h>
+#include "../../str.h"
 #include "../../tcp_conn.h"
 #include "../../tcp_server.h"
 #include "../../lib/kcore/kstats_wrapper.h"
@@ -60,9 +61,15 @@ typedef struct {
 	unsigned int payload_len;
 	unsigned char masking_key[4];
 	char *payload_data;
-	tcp_event_info_t *tcpinfo;
+	ws_connection_t *wsc;
 } ws_frame_t;
 
+typedef enum
+{
+	CONN_CLOSE_DO = 0,
+	CONN_CLOSE_DONT
+} conn_close_t;
+
 #define BYTE0_MASK_FIN		(0x80)
 #define BYTE0_MASK_RSV1		(0x40)
 #define BYTE0_MASK_RSV2		(0x20)
@@ -80,19 +87,52 @@ typedef struct {
 #define OPCODE_PONG		(0xa)
 /* 0xb - 0xf are reserved for further control frames */
 
-
-static int decode_and_validate_ws_frame(ws_frame_t *frame)
+static int close_connection(ws_connection_t *wsc, ws_close_type_t type,
+				short int status, str reason);
+
+stat_var *ws_failed_connections;
+stat_var *ws_local_closed_connections;
+stat_var *ws_received_frames;
+stat_var *ws_remote_closed_connections;
+stat_var *ws_transmitted_frames;
+
+/* WebSocket status text */
+static str str_status_normal_closure = str_init("Normal closure");
+static str str_status_protocol_error = str_init("Protocol error");
+static str str_status_unsupported_opcode = str_init("Unsupported opcode");
+static str str_status_message_too_big = str_init("Message too big");
+
+/* MI command status text */
+static str str_status_empty_param = str_init("Empty connection ID parameter");
+static str str_status_too_many_params = str_init("Too many parameters");
+static str str_status_bad_param = str_init("Bad connection ID parameter");
+static str str_status_error_closing = str_init("Error closing connection");
+static str str_status_error_sending = str_init("Error sending frame");
+
+static int decode_and_validate_ws_frame(ws_frame_t *frame,
+					tcp_event_info_t *tcpinfo)
 {
-	unsigned int i, len=frame->tcpinfo->len;
+	unsigned int i, len = tcpinfo->len;
 	int mask_start, j;
-	char *buf = frame->tcpinfo->buf;
+	char *buf = tcpinfo->buf;
 
 	LM_INFO("decoding WebSocket frame\n");
 
+	if ((frame->wsc = wsconn_find(tcpinfo->con)) == NULL)
+	{
+		LM_WARN("WebSocket connection not found\n");
+		return -1;
+	}
+
+	wsconn_update(frame->wsc);
+
 	/* Decode and validate first 9 bits */
 	if (len < 2)
 	{
 		LM_WARN("message is too short\n");
+		if (close_connection(frame->wsc, LOCAL_CLOSE, 1002,
+					str_status_protocol_error) < 0)
+			LM_ERR("closing connection\n");
 		return -1;
 	}
 	frame->fin = (buf[0] & 0xff) & BYTE0_MASK_FIN;
@@ -106,12 +146,18 @@ static int decode_and_validate_ws_frame(ws_frame_t *frame)
 	{
 		LM_WARN("WebSocket fragmentation not supported in the sip "
 			"sub-protocol\n");
+		if (close_connection(frame->wsc, LOCAL_CLOSE, 1002,
+					str_status_protocol_error) < 0)
+			LM_ERR("closing connection\n");
 		return -1;
 	}
 
 	if (frame->rsv1 || frame->rsv2 || frame->rsv3)
 	{
 		LM_WARN("WebSocket reserved fields with non-zero values\n");
+		if (close_connection(frame->wsc, LOCAL_CLOSE, 1002,
+					str_status_protocol_error) < 0)
+			LM_ERR("closing connection\n");
 		return -1;
 	}
 
@@ -133,6 +179,9 @@ static int decode_and_validate_ws_frame(ws_frame_t *frame)
 	default:
 		LM_WARN("unsupported opcode: 0x%x\n",
 			(unsigned char) frame->opcode);
+		if (close_connection(frame->wsc, LOCAL_CLOSE, 1008,
+					str_status_unsupported_opcode) < 0)
+			LM_ERR("closing connection\n");
 		return -1;
 	}
 
@@ -140,6 +189,9 @@ static int decode_and_validate_ws_frame(ws_frame_t *frame)
 	{
 		LM_WARN("this is a server - all received messages must be "
 			"masked\n");
+		if (close_connection(frame->wsc, LOCAL_CLOSE, 1002,
+					str_status_protocol_error) < 0)
+			LM_ERR("closing connection\n");
 		return -1;
 	}
 
@@ -150,6 +202,9 @@ static int decode_and_validate_ws_frame(ws_frame_t *frame)
 		if (len < 4)
 		{
 			LM_WARN("message is too short\n");
+			if (close_connection(frame->wsc, LOCAL_CLOSE, 1002,
+						str_status_protocol_error) < 0)
+				LM_ERR("closing connection\n");
 			return -1;
 		}
 		mask_start = 4;
@@ -162,10 +217,23 @@ static int decode_and_validate_ws_frame(ws_frame_t *frame)
 		if (len < 10)
 		{
 			LM_WARN("message is too short\n");
+			if (close_connection(frame->wsc, LOCAL_CLOSE, 1002,
+						str_status_protocol_error) < 0)
+				LM_ERR("closing connection\n");
 			return -1;
 		}
 		mask_start = 10;
 
+		if ((buf[2] & 0xff) != 0 || (buf[3] & 0xff) != 0
+			|| (buf[4] & 0xff) != 0 || (buf[5] & 0xff) != 0)
+		{
+			LM_WARN("message is too long\n");
+			if (close_connection(frame->wsc, LOCAL_CLOSE, 1009,
+						str_status_message_too_big) < 0)
+				LM_ERR("closing connection\n");
+			return -1;
+		}
+
 		/* Only decoding the last four bytes of the length...
 		   This limits the size of WebSocket messages that can be
 		   handled to 2^32 = which should be plenty for SIP! */
@@ -188,6 +256,9 @@ static int decode_and_validate_ws_frame(ws_frame_t *frame)
 	{
 		LM_WARN("message not complete frame size %u but received %u\n",
 			frame->payload_len + mask_start + 4, len);
+		if (close_connection(frame->wsc, LOCAL_CLOSE, 1002,
+					str_status_protocol_error) < 0)
+			LM_ERR("closing connection\n");
 		return -1;
 	}
 	frame->payload_data = &buf[mask_start + 4];
@@ -204,7 +275,7 @@ static int decode_and_validate_ws_frame(ws_frame_t *frame)
 	return frame->opcode;
 }
 
-static int encode_and_send_ws_frame(ws_frame_t *frame, int conn_close)
+static int encode_and_send_ws_frame(ws_frame_t *frame, conn_close_t conn_close)
 {
 	int pos = 0, extended_length;
 	unsigned int frame_length;
@@ -213,6 +284,15 @@ static int encode_and_send_ws_frame(ws_frame_t *frame, int conn_close)
 
 	LM_INFO("encoding WebSocket frame\n");
 
+	if (frame->wsc->state != WS_S_OPEN)
+	{
+		LM_ERR("sending on closing connection\n");
+		wsconn_close_now(frame->wsc);
+		return -1;
+	}
+
+	wsconn_update(frame->wsc);
+
 	/* Validate the first byte */
 	if (!frame->fin)
 	{
@@ -292,23 +372,84 @@ static int encode_and_send_ws_frame(ws_frame_t *frame, int conn_close)
 	}
 	memcpy(&send_buf[pos], frame->payload_data, frame->payload_len);
 
-	init_dst_from_rcv(&dst, &frame->tcpinfo->con->rcv);
-	if (conn_close) dst.send_flags.f |= SND_F_CON_CLOSE;
+	init_dst_from_rcv(&dst, &frame->wsc->con->rcv);
+	if (conn_close == CONN_CLOSE_DO)
+	{
+		dst.send_flags.f |= SND_F_CON_CLOSE;
+		if (wsconn_rm(frame->wsc) < 0)
+		{
+			LM_ERR("removing WebSocket connection\n");
+			return -1;
+		}
+	}
 
 	if (tcp_send(&dst, NULL, send_buf, frame_length) < 0)
 	{
 		LM_ERR("sending WebSocket frame\n");
 		pkg_free(send_buf);
 		update_stat(ws_failed_connections, 1);
+		if (wsconn_rm(frame->wsc) < 0)
+			LM_ERR("removing WebSocket connection\n");
 		return -1;
 	}
-	
+
 	update_stat(ws_transmitted_frames, 1);
 
 	pkg_free(send_buf);
 	return 0;
 }
 
+static int close_connection(ws_connection_t *wsc, ws_close_type_t type,
+				short int status, str reason)
+{
+	char *data;
+	ws_frame_t frame;
+
+	data = pkg_malloc(sizeof(char) * (reason.len + 2));
+	if (data == NULL)
+	{
+		LM_ERR("allocating pkg memory\n");
+		return -1;
+	}
+
+	if (wsc->state == WS_S_OPEN)
+	{
+		data[0] = (status & 0xff00) >> 8;
+		data[1] = (status & 0x00ff) >> 0;
+		memcpy(&data[2], reason.s, reason.len);
+
+		memset(&frame, 0, sizeof(frame));
+		frame.fin = 1;
+		frame.opcode = OPCODE_CLOSE;
+		frame.payload_len = reason.len + 2;
+		frame.payload_data = data;
+		frame.wsc = wsc;
+
+		if (encode_and_send_ws_frame(&frame,
+			type ==
+			REMOTE_CLOSE ? CONN_CLOSE_DO : CONN_CLOSE_DONT) < 0)
+		{	
+			LM_ERR("sending WebSocket close\n");
+			pkg_free(data);
+			return -1;
+		}
+
+		pkg_free(data);
+
+		if (type == LOCAL_CLOSE)
+		{
+			frame.wsc->state = WS_S_CLOSING;
+			update_stat(ws_local_closed_connections, 1);
+		}
+		else
+			update_stat(ws_remote_closed_connections, 1);
+	}
+	else /* if (frame->wsc->state == WS_S_CLOSING) */
+		wsconn_close_now(wsc);
+
+	return 0;
+}
+
 static int handle_sip_message(ws_frame_t *frame)
 {
 	LM_INFO("Received SIP message\n");
@@ -323,10 +464,6 @@ static int handle_close(ws_frame_t *frame)
 	unsigned short code = 0;
 	str reason = {0, 0};
 
-	update_stat(ws_remote_closed_connections, 1);
-	update_stat(ws_current_connections, -1);
-	LM_INFO("Received Close\n");
-
 	if (frame->payload_len >= 2)
 		code =    ((frame->payload_data[0] & 0xff) << 8)
 			| ((frame->payload_data[1] & 0xff) << 0);
@@ -337,31 +474,33 @@ static int handle_close(ws_frame_t *frame)
 		reason.len = frame->payload_len - 2;
 	}
 
-	LM_INFO("Close: %hu %.*s\n", code, reason.len, reason.s); 
-
-	/* Close socket */
-	frame->tcpinfo->con->state = S_CONN_BAD;
-	frame->tcpinfo->con->timeout = get_ticks_raw();
+	LM_INFO("Received Close: %hu %.*s\n", code, reason.len, reason.s);
 
+	if (close_connection(frame->wsc,
+		frame->wsc->state == WS_S_OPEN ? REMOTE_CLOSE : LOCAL_CLOSE,
+		1000, str_status_normal_closure) < 0)
+	{
+		LM_ERR("closing connection\n");
+		return -1;
+	}
+	
 	return 0;
 }
 
 static int handle_ping(ws_frame_t *frame)
 {
-	LM_INFO("Received Ping\n");
+	LM_INFO("Received Ping: %.*s\n",
+		frame->payload_len, frame->payload_data);
 
 	frame->opcode = OPCODE_PONG;
 	frame->mask = 0;
-
-	encode_and_send_ws_frame(frame, 0);
+	encode_and_send_ws_frame(frame, CONN_CLOSE_DONT);
 
 	return 0;
 }
 
 static int handle_pong(ws_frame_t *frame)
 {
-	LM_INFO("Received Pong\n");
-
 	LM_INFO("Pong: %.*s\n", frame->payload_len, frame->payload_data);
 
 	return 0;
@@ -370,18 +509,17 @@ static int handle_pong(ws_frame_t *frame)
 int ws_frame_received(void *data)
 {
 	ws_frame_t ws_frame;
-	tcp_event_info_t *tev = (tcp_event_info_t *) data;
+	tcp_event_info_t *tcpinfo = (tcp_event_info_t *) data;
 
 	update_stat(ws_received_frames, 1);
 
-	if (tev == NULL || tev->buf == NULL || tev->len <= 0)
+	if (tcpinfo == NULL || tcpinfo->buf == NULL || tcpinfo->len <= 0)
 	{
 		LM_WARN("received bad frame\n");
 		return -1;
 	}
 
-	ws_frame.tcpinfo = tev;
-	switch(decode_and_validate_ws_frame(&ws_frame))
+	switch(decode_and_validate_ws_frame(&ws_frame, tcpinfo))
 	{
 	case OPCODE_TEXT_FRAME:
 	case OPCODE_BINARY_FRAME:
@@ -428,11 +566,7 @@ struct mi_root *ws_mi_close(struct mi_root *cmd, void *param)
 {
 	unsigned int id;
 	struct mi_node *node = NULL;
-	ws_frame_t frame;
-	tcp_event_info_t tcpinfo;
-	short int code = 1000;
-	str reason = str_init("Normal Closure");
-	char *data;
+	ws_connection_t *wsc;
 
 	node = cmd->node.kids;
 	if (node == NULL)
@@ -440,7 +574,8 @@ struct mi_root *ws_mi_close(struct mi_root *cmd, void *param)
 	if (node->value.s == NULL || node->value.len == 0)
 	{
 		LM_ERR("empty connection ID parameter\n");
-		return init_mi_tree(400, "Empty connection ID parameter", 29);
+		return init_mi_tree(400, str_status_empty_param.s,
+					str_status_empty_param.len);
 	}
 	if (str2int(&node->value, &id) < 0)
 	{
@@ -450,54 +585,98 @@ struct mi_root *ws_mi_close(struct mi_root *cmd, void *param)
 	if (node->next != NULL)
 	{
 		LM_ERR("too many parameters\n");
-		return init_mi_tree(400, "Too many parameters", 19);
+		return init_mi_tree(400, str_status_too_many_params.s,
+					str_status_too_many_params.len);
 	}
 
-	if ((tcpinfo.con = tcpconn_get(id, 0, 0, 0, 0)) == NULL)
+	if ((wsc = wsconn_find(tcpconn_get(id, 0, 0, 0, 0))) == NULL)
 	{
 		LM_ERR("bad connection ID parameter\n");
-		return init_mi_tree(400, "Bad connection ID parameter", 27);
+		return init_mi_tree(400, str_status_bad_param.s,
+					str_status_bad_param.len);
 	}
 
-	if ((data = pkg_malloc(sizeof(char) * (reason.len + 2))) == NULL)
+	if (close_connection(wsc, LOCAL_CLOSE, 1000,
+				str_status_normal_closure) < 0)
 	{
-		LM_ERR("allocating pkg memory\n");
-		return 0;
+		LM_ERR("closing connection\n");
+		return init_mi_tree(500, str_status_error_closing.s,
+					str_status_error_closing.len);
 	}
 
-	data[0] = (code & 0xff00) >> 8;
-	data[1] = (code & 0x00ff) >> 0;
-	memcpy(&data[2], reason.s, reason.len);
+	return init_mi_tree(200, MI_OK_S, MI_OK_LEN);
+}
+
+static int ping_pong(ws_connection_t *wsc, int opcode)
+{
+	ws_frame_t frame;
 
 	memset(&frame, 0, sizeof(frame));
 	frame.fin = 1;
-	frame.opcode = OPCODE_CLOSE;
-	frame.payload_len = reason.len + 2;
-	frame.payload_data = data;
-	frame.tcpinfo = &tcpinfo;
+	frame.opcode = opcode;
+	frame.payload_len = server_hdr.len;
+	frame.payload_data = server_hdr.s;
+	frame.wsc = wsc;
+
+	if (encode_and_send_ws_frame(&frame, CONN_CLOSE_DONT) < 0)
+	{	
+		LM_ERR("closing connection\n");
+		return -1;
+	}
+
+	return 0;
+}
+
+static struct mi_root *mi_ping_pong(struct mi_root *cmd, void *param,
+					int opcode)
+{
+	unsigned int id;
+	struct mi_node *node = NULL;
+	ws_connection_t *wsc;
 
-	if (encode_and_send_ws_frame(&frame, 1) < 0)
+	node = cmd->node.kids;
+	if (node == NULL)
+		return 0;
+	if (node->value.s == NULL || node->value.len == 0)
 	{
-		LM_ERR("sending WebSocket close\n");
-		pkg_free(data);
-		return init_mi_tree(500,"Sending WebSocket close", 23);
+		LM_ERR("empty connection ID parameter\n");
+		return init_mi_tree(400, str_status_empty_param.s,
+					str_status_empty_param.len);
+	}
+	if (str2int(&node->value, &id) < 0)
+	{
+		LM_ERR("converting string to int\n");
+		return 0;
+	}
+	if (node->next != NULL)
+	{
+		LM_ERR("too many parameters\n");
+		return init_mi_tree(400, str_status_too_many_params.s,
+					str_status_too_many_params.len);
 	}
 
-	update_stat(ws_local_closed_connections, 1);
-	update_stat(ws_current_connections, -1);
+	if ((wsc = wsconn_find(tcpconn_get(id, 0, 0, 0, 0))) == NULL)
+	{
+		LM_ERR("bad connection ID parameter\n");
+		return init_mi_tree(400, str_status_bad_param.s,
+					str_status_bad_param.len);
+	}
+
+	if (ping_pong(wsc, opcode) < 0)
+	{
+		return init_mi_tree(500, str_status_error_sending.s,
+					str_status_error_sending.len);
+	}
 
-	pkg_free(data);
 	return init_mi_tree(200, MI_OK_S, MI_OK_LEN);
 }
 
 struct mi_root *ws_mi_ping(struct mi_root *cmd, void *param)
 {
-	/* TODO Ping specified connection */
-	return init_mi_tree(200, MI_OK_S, MI_OK_LEN);
+	return mi_ping_pong(cmd, param, OPCODE_PING);
 }
 
 struct mi_root *ws_mi_pong(struct mi_root *cmd, void *param)
 {
-	/* TODO Pong specified connection */
-	return init_mi_tree(200, MI_OK_S, MI_OK_LEN);
+	return mi_ping_pong(cmd, param, OPCODE_PONG);
 }

+ 14 - 0
modules/websocket/ws_frame.h

@@ -25,7 +25,21 @@
 #define _WS_FRAME_H
 
 #include "../../sr_module.h"
+#include "../../str.h"
 #include "../../lib/kmi/tree.h"
+#include "ws_conn.h"
+
+typedef enum
+{
+	LOCAL_CLOSE = 0,
+	REMOTE_CLOSE
+} ws_close_type_t;
+
+extern stat_var *ws_failed_connections;
+extern stat_var *ws_local_closed_connections;
+extern stat_var *ws_received_frames;
+extern stat_var *ws_remote_closed_connections;
+extern stat_var *ws_transmitted_frames;
 
 int ws_frame_received(void *data);
 struct mi_root *ws_mi_close(struct mi_root *cmd, void *param);

+ 7 - 19
modules/websocket/ws_handshake.c

@@ -27,6 +27,7 @@
 #include "../../data_lump_rpl.h"
 #include "../../dprint.h"
 #include "../../locking.h"
+#include "../../str.h"
 #include "../../tcp_conn.h"
 #include "../../lib/kcore/kstats_wrapper.h"
 #include "../../lib/kcore/cmpapi.h"
@@ -41,6 +42,9 @@
 
 #define WS_VERSION		(13)
 
+stat_var *ws_failed_handshakes;
+stat_var *ws_successful_handshakes;
+
 static str str_sip = str_init("sip");
 static str str_upgrade = str_init("upgrade");
 static str str_websocket = str_init("websocket");
@@ -79,8 +83,6 @@ static char key_buf[KEY_BUF_LEN];
 
 static int ws_send_reply(sip_msg_t *msg, int code, str *reason, str *hdrs)
 {
-	int cur_cons, max_cons;
-
 	if (hdrs && hdrs->len > 0)
 	{
 		if (add_lump_rpl(msg, hdrs->s, hdrs->len, LUMP_RPL_HDR) == 0)
@@ -98,23 +100,9 @@ static int ws_send_reply(sip_msg_t *msg, int code, str *reason, str *hdrs)
 		return -1;
 	}
 
-	if (code == 101)
-	{
-		update_stat(ws_successful_handshakes, 1);
-
-		lock_get(ws_stats_lock);
-		update_stat(ws_current_connections, 1);
-
-		cur_cons = get_stat_val(ws_current_connections);
-		max_cons = get_stat_val(ws_max_concurrent_connections);
-
-		if (max_cons < cur_cons)
-			update_stat(ws_max_concurrent_connections,
-						cur_cons - max_cons);
-		lock_release(ws_stats_lock);
-	}
-	else
-		update_stat(ws_failed_handshakes, 1);
+	update_stat(
+		code == 101 ? ws_successful_handshakes : ws_failed_handshakes,
+		1);
 
 	return 0;
 }

+ 3 - 0
modules/websocket/ws_handshake.h

@@ -27,6 +27,9 @@
 #include "../../sr_module.h"
 #include "../../parser/msg_parser.h"
 
+stat_var *ws_failed_handshakes;
+stat_var *ws_successful_handshakes;
+
 int ws_handle_handshake(struct sip_msg *msg);
 struct mi_root *ws_mi_disable(struct mi_root *cmd, void *param);
 struct mi_root *ws_mi_enable(struct mi_root *cmd, void *param);

+ 11 - 115
modules/websocket/ws_mod.c

@@ -29,7 +29,6 @@
 #include "../../tcp_conn.h"
 #include "../../lib/kcore/kstats_wrapper.h"
 #include "../../lib/kmi/mi.h"
-#include "../../lib/kmi/tree.h"
 #include "../../mem/mem.h"
 #include "../../parser/msg_parser.h"
 #include "ws_conn.h"
@@ -42,29 +41,13 @@ MODULE_VERSION
 /* Maximum number of connections to display when using the ws.dump MI command */
 #define MAX_WS_CONNS_DUMP	50
 
-extern gen_lock_t *tcpconn_lock;
-extern struct tcp_connection **tcpconn_id_hash;
-
 static int mod_init(void);
 static void destroy(void);
 
 sl_api_t ws_slb;
 int *ws_enabled;
-gen_lock_t *ws_stats_lock;
-
-int ws_ping_interval = 30;	/* time (in seconds) between sending Pings */
-
-stat_var *ws_current_connections;
-stat_var *ws_failed_connections;
-stat_var *ws_failed_handshakes;
-stat_var *ws_local_closed_connections;
-stat_var *ws_max_concurrent_connections;
-stat_var *ws_received_frames;
-stat_var *ws_remote_closed_connections;
-stat_var *ws_successful_handshakes;
-stat_var *ws_transmitted_frames;
 
-static struct mi_root *mi_dump(struct mi_root *cmd, void *param);
+int ws_ping_interval = 180;	/* time (in seconds) between sending Pings */
 
 static cmd_export_t cmds[]= 
 {
@@ -77,19 +60,24 @@ static cmd_export_t cmds[]=
 static param_export_t params[]=
 {
 	{ "ping_interval",	INT_PARAM, &ws_ping_interval },
-	{ 0, 0 }
+	{ 0, 0, 0 }
 };
 
 static stat_export_t stats[] =
 {
+	/* ws_conn.c */
 	{ "ws_current_connections",       0, &ws_current_connections },
-	{ "ws_failed_connections",        0, &ws_failed_connections },
+	{ "ws_max_concurrent_connections",0, &ws_max_concurrent_connections },
+
+	/* ws_handshake.c */
 	{ "ws_failed_handshakes",         0, &ws_failed_handshakes },
+	{ "ws_successful_handshakes",     0, &ws_successful_handshakes },
+
+	/* ws_frame.c */
+	{ "ws_failed_connections",        0, &ws_failed_connections },
 	{ "ws_local_closed_connections",  0, &ws_local_closed_connections },
-	{ "ws_max_concurrent_connections",0, &ws_max_concurrent_connections },
 	{ "ws_received_frames",           0, &ws_received_frames },
 	{ "ws_remote_closed_connections", 0, &ws_remote_closed_connections },
-	{ "ws_successful_handshakes",     0, &ws_successful_handshakes },
 	{ "ws_transmitted_frames",        0, &ws_transmitted_frames },
 	{ 0, 0, 0 }
 };
@@ -98,7 +86,7 @@ static mi_export_t mi_cmds[] =
 {
 	{ "ws.close",   ws_mi_close,   0, 0, 0 },
 	{ "ws.disable", ws_mi_disable, 0, 0, 0 },
-	{ "ws.dump",	mi_dump,       0, 0, 0 },
+	{ "ws.dump",	ws_mi_dump,    0, 0, 0 },
 	{ "ws.enable",	ws_mi_enable,  0, 0, 0 },
 	{ "ws.ping",    ws_mi_ping,    0, 0, 0 },
 	{ "ws.pong",	ws_mi_pong,    0, 0, 0 },
@@ -160,31 +148,10 @@ static int mod_init(void)
 	}
 	*ws_enabled = 1;
 
-	if (wsconn_init() < 0)
-	{
-		LM_ERR("initialising WebSocket connections table\n");
-		goto error;
-	}
-
-	if ((ws_stats_lock = lock_alloc()) == NULL)
-	{
-		LM_ERR("allocating lock\n");
-		goto error;
-	}
-	if (lock_init(ws_stats_lock) == NULL)
-	{
-		LM_ERR("initialising lock\n");
-		goto error;
-	}
-
 	return 0;
 
 error:
 	wsconn_destroy();
-
-	if (ws_stats_lock)
-		lock_dealloc(ws_stats_lock);
-
 	shm_free(ws_enabled);
 
 	return -1;
@@ -194,75 +161,4 @@ static void destroy(void)
 {
 	wsconn_destroy();
 	shm_free(ws_enabled);
-	lock_destroy(ws_stats_lock);
-	lock_dealloc(ws_stats_lock);
-}
-
-static struct mi_root *mi_dump(struct mi_root *cmd, void *param)
-{
-	int h, connections = 0, truncated = 0, interval;
-	char *src_proto, *dst_proto;
-	char src_ip[IP6_MAX_STR_SIZE + 1], dst_ip[IP6_MAX_STR_SIZE + 1];
-	ws_connection_t *wsc;
-	struct mi_root *rpl_tree = init_mi_tree(200, MI_OK_S, MI_OK_LEN);
-
-	if (!rpl_tree)
-		return 0;
-
-	WSCONN_LOCK;
-	for (h = 0; h < TCP_ID_HASH_SIZE; h++)
-	{
-		wsc = wsconn_hash[h];
-		while(wsc)
-		{
-			if (wsc->con)
-			{
-				src_proto = (wsc->con->rcv.proto== PROTO_TCP)
-						? "tcp" : "tls";
-				memset(src_ip, 0, IP6_MAX_STR_SIZE + 1);
-				ip_addr2sbuf(&wsc->con->rcv.src_ip, src_ip,
-						IP6_MAX_STR_SIZE);
-
-				dst_proto = (wsc->con->rcv.proto == PROTO_TCP)
-						? "tcp" : "tls";
-				memset(dst_ip, 0, IP6_MAX_STR_SIZE + 1);
-				ip_addr2sbuf(&wsc->con->rcv.dst_ip, src_ip,
-						IP6_MAX_STR_SIZE);
-
-				interval = (int)time(NULL) - wsc->last_used;
-
-				if (addf_mi_node_child(&rpl_tree->node, 0, 0, 0,
-						"%d: %s:%s:%hu -> %s:%s:%hu "
-						"(state: %s, "
-						"last used %ds ago)",
-						wsc->con->id,
-						src_proto,
-						strlen(src_ip) ? src_ip : "*",
-						wsc->con->rcv.src_port,
-						dst_proto,
-						strlen(dst_ip) ? dst_ip : "*",
-						wsc->con->rcv.dst_port,
-						wsconn_state_str[wsc->state],
-						interval) == 0)
-					return 0;
-
-				if (++connections == MAX_WS_CONNS_DUMP)
-				{
-					truncated = 1;
-					break;
-				}
-			}
-
-			wsc = wsc->next;
-		}
-	}
-	WSCONN_UNLOCK;
-
-	if (addf_mi_node_child(&rpl_tree->node, 0, 0, 0,
-				"%d WebSocket connection%s found%s",
-				connections, connections == 1 ? "" : "s",
-				truncated == 1 ? "(truncated)" : "") == 0)
-		return 0;
-
-	return rpl_tree;
 }

+ 0 - 10
modules/websocket/ws_mod.h

@@ -34,14 +34,4 @@ extern gen_lock_t *ws_stats_lock;
 
 extern int ws_ping_interval;	/* time (in seconds) between sending Pings */
 
-extern stat_var *ws_current_connections;
-extern stat_var *ws_failed_connections;
-extern stat_var *ws_failed_handshakes;
-extern stat_var *ws_local_closed_connections;
-extern stat_var *ws_max_concurrent_connections;
-extern stat_var *ws_received_frames;
-extern stat_var *ws_remote_closed_connections;
-extern stat_var *ws_successful_handshakes;
-extern stat_var *ws_transmitted_frames;
-
 #endif /* _WS_MOD_H */