浏览代码

check corner-case and add comments

Farzaneh Soltanzadeh 8 月之前
父节点
当前提交
0a67119d1d
共有 3 个文件被更改,包括 189 次插入85 次删除
  1. 164 55
      src/modules/rtpengine/rtpengine.c
  2. 25 2
      src/modules/rtpengine/rtpengine.h
  3. 0 28
      src/modules/rtpengine/rtpengine_common.h

+ 164 - 55
src/modules/rtpengine/rtpengine.c

@@ -3305,7 +3305,7 @@ static bencode_item_t *rtpp_function_call(bencode_buffer_t *bencbuf,
 	bencode_item_t *result;
 	pv_value_t pv_val;
 	str viabranch = STR_NULL;
-	str body = STR_NULL, error = STR_NULL;
+	str body = STR_NULL, error = STR_NULL, tmp_callid = STR_NULL;
 	int ret, queried_nodes = 0, cont_type = 0;
 	unsigned int parse_by_module = (p_viabranch) ? 0 : 1;
 	struct rtpp_node *node;
@@ -3349,7 +3349,10 @@ static bencode_item_t *rtpp_function_call(bencode_buffer_t *bencbuf,
 	} else {
 		ng_flags.dict = extra_dict;
 		ng_flags.flags = bencode_dictionary_get(ng_flags.dict, "flags");
-		bencode_dictionary_get_str(ng_flags.dict, "call-id", &ng_flags.call_id);
+		bencode_dictionary_get_str(ng_flags.dict, "call-id", &tmp_callid);
+		if(tmp_callid.len > 0) {
+			ng_flags.call_id = tmp_callid;
+		}
 	}
 
 	if(parse_by_module) {
@@ -6074,61 +6077,89 @@ static int bind_rtpengine(rtpengine_api_t *api)
 	return 0;
 }
 
+/**
+ * @brief Wraps the subscription request for RTPEngine.
+ *
+ * @details Constructs a bencode dictionary containing the subscription details.
+ * Handles different subscription modes.
+ *
+ * @param sess The RTPEngine session containing session-related data.
+ * @param op The RTPEngine operation (e.g., subscribe , unsubscribe).
+ * @param to_tag Optional tag identifying the participant (nullable).
+ * @param flags A space-separated string of feature flags used in rtpengine functions.
+ * @param subscribe_flags Flags controlling the subscription behavior (e.g., SIPREC mode, inactivity).
+ * @param body Optional SDP body for the subscribe answer command (nullable).
+ * @return A pointer to a bencode item representing the RTPEngine response, or NULL on failure.
+ */
 static bencode_item_t *rtpengine_subscribe_wrap(struct rtpengine_session *sess,
 		enum rtpe_operation op, str *to_tag, str *flags,
 		unsigned int subscribe_flags, str *body)
 {
 	static bencode_buffer_t bencbuf;
 	bencode_item_t *dict, *list = NULL;
-	bencode_item_t *ret;
 	struct sip_msg *msg;
 	str *viabranch = NULL;
 
+	// Initialize the bencode buffer
 	if(bencode_buffer_init(&bencbuf)) {
 		LM_ERR("could not initialize bencode_buffer_t\n");
 		return NULL;
 	}
-	dict = bencode_dictionary(&bencbuf);
 
-	if(sess->callid)
+	// Create and fill a dictionary with subscription data
+	dict = bencode_dictionary(&bencbuf);
+	if(sess->callid) {
 		bencode_dictionary_add_str(dict, "call-id", sess->callid);
-	else if(sess->msg)
+	} else if(sess->msg) {
 		bencode_dictionary_add_str(dict, "call-id", &sess->msg->callid->body);
-	if(sess->branch != RTPENGINE_ALL_BRANCHES)
+	}
+	if(sess->branch != RTPENGINE_ALL_BRANCHES) {
 		bencode_dictionary_add_str(dict, "via-branch", viabranch);
-	if(to_tag && to_tag->len)
+	}
+	if(to_tag && to_tag->len) {
 		bencode_dictionary_add_str(dict, "to-tag", to_tag);
+	}
 	if(subscribe_flags & RTP_SUBSCRIBE_MODE_SIPREC) {
 		list = bencode_list(&bencbuf);
 		bencode_list_add_string(list, "all");
 		bencode_list_add_string(list, "siprec");
-	} else if((subscribe_flags & RTP_COPY_LEG_BOTH) == RTP_COPY_LEG_BOTH) {
+	} else if((subscribe_flags & RTP_SUBSCRIBE_LEG_BOTH)
+			  == RTP_SUBSCRIBE_LEG_BOTH) {
 		list = bencode_list(&bencbuf);
 		bencode_list_add_string(list, "all");
-	} else if(subscribe_flags & RTP_COPY_LEG_CALLER && sess->from_tag) {
+	} else if(subscribe_flags & RTP_SUBSCRIBE_LEG_CALLER && sess->from_tag) {
 		bencode_dictionary_add_str(dict, "from-tag", sess->from_tag);
 	} else if(sess->to_tag) {
 		bencode_dictionary_add_str(dict, "from-tag", sess->to_tag);
 	}
 	if(subscribe_flags & RTP_SUBSCRIBE_MODE_DISABLE) {
-		if(!list)
+		if(!list) {
 			list = bencode_list(&bencbuf);
+		}
 		bencode_list_add_string(list, "inactive");
 	}
-	if(list)
+	if(list) {
 		bencode_dictionary_add(dict, "flags", list);
-
-	if(op == OP_SUBSCRIBE_ANSWER)
+	}
+	if(op == OP_SUBSCRIBE_ANSWER) {
 		bencode_dictionary_add_str(dict, "sdp", body);
+	}
 
+	// Use the session's message or a fake message
 	msg = (sess->msg ? sess->msg : faked_msg_get_next());
 
-	ret = rtpp_function_call_ok(
+	return rtpp_function_call_ok(
 			&bencbuf, msg, op, flags, NULL, NULL, NULL, dict);
-
-	return ret;
 }
 
+/**
+ * @brief Allocates and initializes a new participant tag.
+ *
+ * @details Note that to_tag must be freed after use.
+ *
+ * @param tag The tag to copy.
+ * @return A new allocated tag, or NULL on failure.
+ */
 static str *rtpengine_new_subs(str *tag)
 {
 	str *to_tag = shm_malloc(sizeof *to_tag + tag->len);
@@ -6137,102 +6168,180 @@ static str *rtpengine_new_subs(str *tag)
 		to_tag->len = tag->len;
 		memcpy(to_tag->s, tag->s, tag->len);
 	}
+
 	return to_tag;
 }
 
+/**
+ * @brief Copies RTPEngine stream data into a structured format.
+ *
+ * @details Extracts stream information from a bencode item and populates the result structure.
+ * Validates stream data.
+ *
+ * @param streams The bencode item containing stream data.
+ * @param ret The structure to populate with stream data.
+ * @param sess The rtpengine session containing session-related data.
+ * @return void.
+ */
 static void rtpengine_copy_streams(bencode_item_t *streams,
-		struct rtpengine_streams *ret, struct rtpengine_session *sess)
+		struct rtpengine_streams *ret_streams, struct rtpengine_session *sess)
 {
 	bencode_item_t *item, *medias;
-	str tmp = STR_NULL;
-	int leg = RTPENGINE_CALLEE, medianum, label, s;
-	if(!ret || !streams)
+	str tag = STR_NULL, label_str = STR_NULL;
+	int leg = RTPENGINE_CALLEE, medianum, label, streams_count;
+
+	if(!ret_streams || !streams) {
 		return;
-	ret->count = 0;
-	s = 0;
+	}
+
+	ret_streams->count = 0;
+	streams_count = 0;
+
+	// Iterate through each item (each item is related to a participant)
 	for(item = streams->child; item; item = item->sibling) {
-		tmp.s = bencode_dictionary_get_string(item, "tag", &tmp.len);
-		if(!tmp.s)
+		// Set leg based on participant tag
+		tag.s = bencode_dictionary_get_string(item, "tag", &tag.len);
+		if(!tag.s) {
 			LM_WARN("could not retrieve tag - placing to %s\n",
 					(leg == RTPENGINE_CALLER ? "caller" : "callee"));
-		else {
+		} else {
 			if(sess->from_tag && sess->from_tag->len > 0
-					&& str_strcmp(&tmp, sess->from_tag) == 0)
+					&& str_strcmp(&tag, sess->from_tag) == 0) {
 				leg = RTPENGINE_CALLER;
-			else if(sess->to_tag && sess->to_tag->len > 0
-					&& str_strcmp(&tmp, sess->to_tag) != 0)
+			} else if(sess->to_tag && sess->to_tag->len > 0
+					  && str_strcmp(&tag, sess->to_tag) != 0) {
 				leg = RTPENGINE_CALLER;
-			else
+			} else {
 				leg = RTPENGINE_CALLEE;
+			}
 		}
+		// Retrieve media streams
 		medias = bencode_dictionary_get_expect(item, "medias", BENCODE_LIST);
-		if(!medias)
+		if(!medias) {
 			continue;
+		}
+		// Iterate through each media stream
 		for(medias = medias->child; medias; medias = medias->sibling) {
-			s = ret->count;
-			if(s == RTP_COPY_MAX_STREAMS) {
+			streams_count = ret_streams->count;
+			if(streams_count == RTP_SUBSCRIBE_MAX_STREAMS) {
 				LM_WARN("maximum amount of streams %d reached!\n",
-						RTP_COPY_MAX_STREAMS);
+						RTP_SUBSCRIBE_MAX_STREAMS);
 				return;
 			}
 			medianum = bencode_dictionary_get_integer(item, "index", 0);
-			tmp.s = bencode_dictionary_get_string(medias, "label", &tmp.len);
-			if(str2sint(&tmp, &label) < 0) {
+			label_str.s = bencode_dictionary_get_string(
+					medias, "label", &label_str.len);
+			if(str2sint(&label_str, &label) < 0) {
 				LM_WARN("invalid label %.*s - not integer - skipping\n",
-						tmp.len, tmp.s);
+						label_str.len, label_str.s);
 				continue;
 			}
-			ret->streams[s].leg = leg;
-			ret->streams[s].label = label;
-			ret->streams[s].medianum = medianum;
-			ret->count++;
+			ret_streams->streams[streams_count].leg = leg;
+			ret_streams->streams[streams_count].label = label;
+			ret_streams->streams[streams_count].medianum = medianum;
+			ret_streams->count++;
 		}
 	}
 }
 
+/**
+ * @brief Sends a subscribe request command to RTPEngine to request subscription (i.e. receiving a copy of the media).
+ *
+ * @details This function interacts with RTPEngine to request a copy of media streams for the given session.
+ * Note: The to_tag must be freed after use.
+ *
+ * @param sess The rtpengine session containing session-related data.
+ * @param to_tag Pointer to a participant tag which corresponds to the subscription. If NULL, RTPEngine generates it.
+ * @param flags A space-separated string of feature flags used in rtpengine functions.
+ * @param subscribe_flags Flags controlling the subscription behavior (e.g., SIPREC mode, inactivity).
+ * @param ret_body Pointer to a string to store the SDP body extracted from RTPEngine's response.
+ * @param ret_streams Pointer to a structure to store stream data extracted from RTPEngine's response.
+ * @return 1 on success, -1 on failure.
+ */
 static int ki_rtpengine_subscribe_request(struct rtpengine_session *sess,
 		str **to_tag, str *flags, unsigned int subscribe_flags, str *ret_body,
 		struct rtpengine_streams *ret_streams)
 {
-	str tmp;
-	bencode_item_t *ret;
-	ret = rtpengine_subscribe_wrap(
+	str tmp_to_tag;
+	bencode_item_t *dict;
+
+	// Wrap the subscribe request command and send to RTPEngine
+	dict = rtpengine_subscribe_wrap(
 			sess, OP_SUBSCRIBE_REQUEST, *to_tag, flags, subscribe_flags, NULL);
-	if(!ret)
+	if(!dict) {
 		return -1;
-	if(!bencode_dictionary_get_str_dup(ret, "sdp", ret_body))
+	}
+	// Extract SDP body from the RTPEngine response
+	if(!bencode_dictionary_get_str_dup(dict, "sdp", ret_body)) {
 		LM_ERR("failed to extract sdp body from proxy reply\n");
-	if(ret_streams)
+	}
+	// Extract stream data from the RTPEngine response
+	if(ret_streams) {
 		rtpengine_copy_streams(
-				bencode_dictionary_get(ret, "tag-medias"), ret_streams, sess);
-	if(!bencode_dictionary_get_str(ret, "to-tag", &tmp))
+				bencode_dictionary_get(dict, "tag-medias"), ret_streams, sess);
+	}
+	// Extract and allocate a new to-tag from the RTPEngine response
+	if(!bencode_dictionary_get_str(dict, "to-tag", &tmp_to_tag)) {
 		LM_ERR("failed to extract to-tag from proxy reply\n");
-	else
-		*to_tag = rtpengine_new_subs(&tmp);
-	bencode_buffer_free(bencode_item_buffer(ret));
-	return 0;
+	} else {
+		*to_tag = rtpengine_new_subs(&tmp_to_tag);
+	}
+	// Free the bencode buffer
+	bencode_buffer_free(bencode_item_buffer(dict));
+
+	return 1;
 }
 
+/**
+ * @brief Sends a subscribe answer command to RTPEngine.
+ *
+ * @details If successful, media forwarding will start to the endpoint given in the SDP (body param).
+ *
+ * @param sess The rtpengine session containing session-related data.
+ * @param to_tag The participant tag identifying the subscription.
+ * @param flags A space-separated string of feature flags used in rtpengine functions.
+ * @param body The SDP body to send as part of the subscribe answer command.
+ * @return 1 on success, -1 on failure.
+ */
 static int ki_rtpengine_subscribe_answer(
 		struct rtpengine_session *sess, str *to_tag, str *flags, str *body)
 {
 	bencode_item_t *ret;
+
+	// Wrap the subscribe answer command and send to RTPEngine
 	ret = rtpengine_subscribe_wrap(
 			sess, OP_SUBSCRIBE_ANSWER, to_tag, flags, 0, body);
-	if(!ret)
+	if(!ret) {
 		return -1;
+	}
+	// Free the bencode buffer
 	bencode_buffer_free(bencode_item_buffer(ret));
 	return ret != NULL;
 }
 
+/**
+ * @brief Sends an unsubscribe command to RTPEngine to stop an established subscription.
+ *
+ * @details The subscription to be stopped is identified by the provided to-tag.
+ *
+ * @param sess The rtpengine session containing session-related data.
+ * @param to_tag The participant tag identifying the subscription to stop.
+ * @param flags: A space-separated string of feature flags used in rtpengine functions.
+ * @return 1 on success, -1 on failure.
+ */
 static int ki_rtpengine_unsubscribe(
 		struct rtpengine_session *sess, str *to_tag, str *flags)
 {
 	bencode_item_t *ret;
+
+	// Wrap the unsubscribe command and send to RTPEngine
 	ret = rtpengine_subscribe_wrap(
 			sess, OP_UNSUBSCRIBE, to_tag, flags, 0, NULL);
-	if(!ret)
+	if(!ret) {
 		return -1;
+	}
+	// Free the bencode buffer
 	bencode_buffer_free(bencode_item_buffer(ret));
+
 	return ret != NULL;
 }

+ 25 - 2
src/modules/rtpengine/rtpengine.h

@@ -28,12 +28,23 @@
 #include "bencode.h"
 #include "../../core/str.h"
 #include "../../core/locking.h"
-#include "rtpengine_common.h"
 
 #define RTPENGINE_MIN_RECHECK_TICKS 0
 #define RTPENGINE_MAX_RECHECK_TICKS ((unsigned int)-1)
 #define RTPENGINE_ALL_BRANCHES -1
 
+#define RTPENGINE_CALLER 0
+#define RTPENGINE_CALLEE 1
+
+#define RTP_SUBSCRIBE_MODE_SIPREC (1 << 0)
+#define RTP_SUBSCRIBE_MODE_DISABLE (1 << 1)
+
+#define RTP_SUBSCRIBE_LEG_CALLER (1 << 2)
+#define RTP_SUBSCRIBE_LEG_CALLEE (1 << 3)
+#define RTP_SUBSCRIBE_LEG_BOTH \
+	(RTP_SUBSCRIBE_LEG_CALLER | RTP_SUBSCRIBE_LEG_CALLEE)
+#define RTP_SUBSCRIBE_MAX_STREAMS 32
+
 enum rtpe_operation
 {
 	OP_OFFER = 1,
@@ -138,7 +149,19 @@ struct rtpengine_session
 	str *callid;
 	str *from_tag;
 	str *to_tag;
-	str *body;
+};
+
+struct rtpengine_stream
+{
+	int leg;	  /* corresponds to participant, 0: caller , 1: callee */
+	int medianum; /* sequentially numbered index of the media for each participant, starting with one */
+	int label;	  /* label of media stream */
+};
+
+struct rtpengine_streams
+{
+	int count;
+	struct rtpengine_stream streams[RTP_SUBSCRIBE_MAX_STREAMS];
 };
 
 #endif

+ 0 - 28
src/modules/rtpengine/rtpengine_common.h

@@ -1,28 +0,0 @@
-#ifndef _RTPENGINE_COMMON_H_
-#define _RTPENGINE_COMMON_H_
-
-#define RTPENGINE_CALLER 0
-#define RTPENGINE_CALLEE 1
-
-#define RTP_SUBSCRIBE_MODE_SIPREC (1 << 0)
-#define RTP_SUBSCRIBE_MODE_DISABLE (1 << 1)
-
-#define RTP_COPY_LEG_CALLER (1 << 2)
-#define RTP_COPY_LEG_CALLEE (1 << 3)
-#define RTP_COPY_LEG_BOTH (RTP_COPY_LEG_CALLER | RTP_COPY_LEG_CALLEE)
-#define RTP_COPY_MAX_STREAMS 32
-
-struct rtpengine_stream
-{
-	int leg;
-	int medianum;
-	int label;
-};
-
-struct rtpengine_streams
-{
-	int count;
-	struct rtpengine_stream streams[RTP_COPY_MAX_STREAMS];
-};
-
-#endif /* _RTPENGINE_COMMON_H_ */