Browse Source

Merge pull request #98343 from Faless/ws/wslay_unbuffered

[WS] Implement wslay unbuffered message parsing
Thaddeus Crews 11 months ago
parent
commit
7ff6591808

+ 1 - 1
modules/websocket/doc_classes/WebSocketMultiplayerPeer.xml

@@ -60,7 +60,7 @@
 		<member name="inbound_buffer_size" type="int" setter="set_inbound_buffer_size" getter="get_inbound_buffer_size" default="65535">
 			The inbound buffer size for connected peers. See [member WebSocketPeer.inbound_buffer_size] for more details.
 		</member>
-		<member name="max_queued_packets" type="int" setter="set_max_queued_packets" getter="get_max_queued_packets" default="2048">
+		<member name="max_queued_packets" type="int" setter="set_max_queued_packets" getter="get_max_queued_packets" default="4096">
 			The maximum number of queued packets for connected peers. See [member WebSocketPeer.max_queued_packets] for more details.
 		</member>
 		<member name="outbound_buffer_size" type="int" setter="set_outbound_buffer_size" getter="get_outbound_buffer_size" default="65535">

+ 1 - 1
modules/websocket/doc_classes/WebSocketPeer.xml

@@ -162,7 +162,7 @@
 		<member name="inbound_buffer_size" type="int" setter="set_inbound_buffer_size" getter="get_inbound_buffer_size" default="65535">
 			The size of the input buffer in bytes (roughly the maximum amount of memory that will be allocated for the inbound packets).
 		</member>
-		<member name="max_queued_packets" type="int" setter="set_max_queued_packets" getter="get_max_queued_packets" default="2048">
+		<member name="max_queued_packets" type="int" setter="set_max_queued_packets" getter="get_max_queued_packets" default="4096">
 			The maximum amount of packets that will be allowed in the queues (both inbound and outbound).
 		</member>
 		<member name="outbound_buffer_size" type="int" setter="set_outbound_buffer_size" getter="get_outbound_buffer_size" default="65535">

+ 8 - 0
modules/websocket/packet_buffer.h

@@ -104,6 +104,14 @@ public:
 		return _queued;
 	}
 
+	int payload_space_left() const {
+		return _payload.space_left();
+	}
+
+	int packets_space_left() const {
+		return _packets.size() - _queued;
+	}
+
 	void clear() {
 		_payload.resize(0);
 		_packets.resize(0);

+ 1 - 1
modules/websocket/websocket_peer.h

@@ -71,7 +71,7 @@ protected:
 
 	int outbound_buffer_size = DEFAULT_BUFFER_SIZE;
 	int inbound_buffer_size = DEFAULT_BUFFER_SIZE;
-	int max_queued_packets = 2048;
+	int max_queued_packets = 4096;
 	uint64_t heartbeat_interval_msec = 0;
 
 public:

+ 46 - 14
modules/websocket/wsl_peer.cpp

@@ -295,6 +295,7 @@ Error WSLPeer::_do_server_handshake() {
 			resolver.stop();
 			// Response sent, initialize wslay context.
 			wslay_event_context_server_init(&wsl_ctx, &_wsl_callbacks, this);
+			wslay_event_config_set_no_buffering(wsl_ctx, 1);
 			wslay_event_config_set_max_recv_msg_length(wsl_ctx, inbound_buffer_size);
 			in_buffer.resize(nearest_shift(inbound_buffer_size), max_queued_packets);
 			packet_buffer.resize(inbound_buffer_size);
@@ -403,6 +404,7 @@ void WSLPeer::_do_client_handshake() {
 					ERR_FAIL_MSG("Invalid response headers.");
 				}
 				wslay_event_context_client_init(&wsl_ctx, &_wsl_callbacks, this);
+				wslay_event_config_set_no_buffering(wsl_ctx, 1);
 				wslay_event_config_set_max_recv_msg_length(wsl_ctx, inbound_buffer_size);
 				in_buffer.resize(nearest_shift(inbound_buffer_size), max_queued_packets);
 				packet_buffer.resize(inbound_buffer_size);
@@ -568,8 +570,15 @@ ssize_t WSLPeer::_wsl_recv_callback(wslay_event_context_ptr ctx, uint8_t *data,
 		wslay_event_set_error(ctx, WSLAY_ERR_CALLBACK_FAILURE);
 		return -1;
 	}
+	// Make sure we don't read more than what our buffer can hold.
+	size_t buffer_limit = MIN(peer->in_buffer.payload_space_left(), peer->in_buffer.packets_space_left() * 2); // The minimum size of a websocket message is 2 bytes.
+	size_t to_read = MIN(len, buffer_limit);
+	if (to_read == 0) {
+		wslay_event_set_error(ctx, WSLAY_ERR_WOULDBLOCK);
+		return -1;
+	}
 	int read = 0;
-	Error err = conn->get_partial_data(data, len, read);
+	Error err = conn->get_partial_data(data, to_read, read);
 	if (err != OK) {
 		print_verbose("Websocket get data error: " + itos(err) + ", read (should be 0!): " + itos(read));
 		wslay_event_set_error(ctx, WSLAY_ERR_CALLBACK_FAILURE);
@@ -582,6 +591,37 @@ ssize_t WSLPeer::_wsl_recv_callback(wslay_event_context_ptr ctx, uint8_t *data,
 	return read;
 }
 
+void WSLPeer::_wsl_recv_start_callback(wslay_event_context_ptr ctx, const struct wslay_event_on_frame_recv_start_arg *arg, void *user_data) {
+	WSLPeer *peer = (WSLPeer *)user_data;
+	uint8_t op = arg->opcode;
+	if (op == WSLAY_TEXT_FRAME || op == WSLAY_BINARY_FRAME) {
+		// Get ready to process a data package.
+		PendingMessage &pm = peer->pending_message;
+		pm.opcode = op;
+		pm.payload_size = arg->payload_length;
+	}
+}
+
+void WSLPeer::_wsl_frame_recv_chunk_callback(wslay_event_context_ptr ctx, const struct wslay_event_on_frame_recv_chunk_arg *arg, void *user_data) {
+	WSLPeer *peer = (WSLPeer *)user_data;
+	PendingMessage &pm = peer->pending_message;
+	if (pm.opcode != 0) {
+		// Only write the payload.
+		peer->in_buffer.write_packet(arg->data, arg->data_length, nullptr);
+	}
+}
+
+void WSLPeer::_wsl_frame_recv_end_callback(wslay_event_context_ptr ctx, void *user_data) {
+	WSLPeer *peer = (WSLPeer *)user_data;
+	PendingMessage &pm = peer->pending_message;
+	if (pm.opcode != 0) {
+		// Only write the packet (since it's now completed).
+		uint8_t is_string = pm.opcode == WSLAY_TEXT_FRAME ? 1 : 0;
+		peer->in_buffer.write_packet(nullptr, pm.payload_size, &is_string);
+		pm.clear();
+	}
+}
+
 ssize_t WSLPeer::_wsl_send_callback(wslay_event_context_ptr ctx, const uint8_t *data, size_t len, int flags, void *user_data) {
 	WSLPeer *peer = (WSLPeer *)user_data;
 	Ref<StreamPeer> conn = peer->connection;
@@ -627,28 +667,19 @@ void WSLPeer::_wsl_msg_recv_callback(wslay_event_context_ptr ctx, const struct w
 		return;
 	}
 
-	if (peer->ready_state == STATE_CLOSING) {
-		return;
-	}
-
-	if (op == WSLAY_TEXT_FRAME || op == WSLAY_BINARY_FRAME) {
-		// Message.
-		uint8_t is_string = arg->opcode == WSLAY_TEXT_FRAME ? 1 : 0;
-		peer->in_buffer.write_packet(arg->msg, arg->msg_length, &is_string);
-	}
 	if (op == WSLAY_PONG) {
 		peer->heartbeat_waiting = false;
 	}
-	// Pong.
+	// Ping, or message (already parsed in chunks).
 }
 
 wslay_event_callbacks WSLPeer::_wsl_callbacks = {
 	_wsl_recv_callback,
 	_wsl_send_callback,
 	_wsl_genmask_callback,
-	nullptr, /* on_frame_recv_start_callback */
-	nullptr, /* on_frame_recv_callback */
-	nullptr, /* on_frame_recv_end_callback */
+	_wsl_recv_start_callback,
+	_wsl_frame_recv_chunk_callback,
+	_wsl_frame_recv_end_callback,
 	_wsl_msg_recv_callback
 };
 
@@ -836,6 +867,7 @@ void WSLPeer::close(int p_code, String p_reason) {
 	heartbeat_waiting = false;
 	in_buffer.clear();
 	packet_buffer.resize(0);
+	pending_message.clear();
 }
 
 IPAddress WSLPeer::get_connected_host() const {

+ 15 - 0
modules/websocket/wsl_peer.h

@@ -53,6 +53,10 @@ private:
 
 	// Callbacks.
 	static ssize_t _wsl_recv_callback(wslay_event_context_ptr ctx, uint8_t *data, size_t len, int flags, void *user_data);
+	static void _wsl_recv_start_callback(wslay_event_context_ptr ctx, const struct wslay_event_on_frame_recv_start_arg *arg, void *user_data);
+	static void _wsl_frame_recv_chunk_callback(wslay_event_context_ptr ctx, const struct wslay_event_on_frame_recv_chunk_arg *arg, void *user_data);
+	static void _wsl_frame_recv_end_callback(wslay_event_context_ptr ctx, void *user_data);
+
 	static ssize_t _wsl_send_callback(wslay_event_context_ptr ctx, const uint8_t *data, size_t len, int flags, void *user_data);
 	static int _wsl_genmask_callback(wslay_event_context_ptr ctx, uint8_t *buf, size_t len, void *user_data);
 	static void _wsl_msg_recv_callback(wslay_event_context_ptr ctx, const struct wslay_event_on_msg_recv_arg *arg, void *user_data);
@@ -80,6 +84,16 @@ private:
 		Resolver() {}
 	};
 
+	struct PendingMessage {
+		size_t payload_size = 0;
+		uint8_t opcode = 0;
+
+		void clear() {
+			payload_size = 0;
+			opcode = 0;
+		}
+	};
+
 	Resolver resolver;
 
 	// WebSocket connection state.
@@ -101,6 +115,7 @@ private:
 	uint8_t was_string = 0;
 	uint64_t last_heartbeat = 0;
 	bool heartbeat_waiting = false;
+	PendingMessage pending_message;
 
 	// WebSocket configuration.
 	bool use_tls = true;