浏览代码

rtpengine: support daisy-chaining two RTP proxies

Richard Fuchs 11 年之前
父节点
当前提交
f049d553bf
共有 2 个文件被更改,包括 195 次插入97 次删除
  1. 19 5
      modules/rtpengine/doc/rtpengine_admin.xml
  2. 176 92
      modules/rtpengine/rtpengine.c

+ 19 - 5
modules/rtpengine/doc/rtpengine_admin.xml

@@ -245,7 +245,7 @@ modparam("rtpengine", "setid_avp", "$avp(setid)")
 	<title>Functions</title>
 	<section id="rtpengine.f.set_rtpengine_set">
 		<title>
-		<function moreinfo="none">set_rtpengine_set(setid)</function>
+		<function moreinfo="none">set_rtpengine_set(setid[, setid])</function>
 		</title>
 		<para>
 		Sets the ID of the &rtp; proxy set to be used for the next
@@ -254,6 +254,20 @@ modparam("rtpengine", "setid_avp", "$avp(setid)")
 		a config variable holding an integer.
 		</para>
 		<para>
+		A second set ID can be specified to daisy-chain two &rtp; proxies.
+		The two set IDs must be distinct from each other and there must not
+		be any overlap in the proxies present in both sets. In this use case,
+		the request (offer, answer, etc) is first sent to an &rtp; proxy from
+		the first set, which rewrites the &sdp; body and sends it back to the
+		module. The rewritten &sdp; body is then used to make another request
+		to an &rtp; proxy from the second set, which rewrites the &sdp; body
+		another time and sends it back to the module to be placed back into the
+		&sip; message. This is useful if you have a set of &rtp; proxies that
+		the caller must use, and another distinct set of &rtp; proxies that the
+		callee must use. This is supported by all rtpengine commands except
+		rtpengine_manage().
+		</para>
+		<para>
 		This function can be used from REQUEST_ROUTE, ONREPLY_ROUTE,
 		BRANCH_ROUTE.
 		</para>
@@ -321,17 +335,17 @@ rtpengine_offer();
 				</para></listitem>
 				<listitem><para>
 				<emphasis>internal, external</emphasis> - these flags specify the direction of
-				the SIP message. These flags only make sense when the &rtp; proxy is running
+				the &sip; message. These flags only make sense when the &rtp; proxy is running
 				in bridge mode. <quote>internal</quote> corresponds to the proxy's first
 				interface, <quote>external</quote> corresponds to the &rtp; proxy's
 				second interface. You always have to specify two flags to define
 				the incoming network and the outgoing network. For example, <quote>internal
 				external</quote> should be
-				used for SIP message received from the local interface and sent out on the
+				used for &sip; message received from the local interface and sent out on the
 				external interface, and <quote>external internal</quote> vice versa. Other
 				options are <quote>internal internal</quote> and <quote>external
 				external</quote>.
-				So, for example if a SIP requests is processed with <quote>internal
+				So, for example if a &sip; requests is processed with <quote>internal
 				external</quote> flags, the corresponding
 				response must be processed with <quote>internal external</quote> flags.
 				</para></listitem>
@@ -371,7 +385,7 @@ rtpengine_offer();
 				<listitem><para>
 				<emphasis>trust-address</emphasis> - flags that IP address in SDP should
 				be trusted. Without this flag, the &rtp; proxy ignores address in
-				the SDP and uses source address of the SIP message as media
+				the SDP and uses source address of the &sip; message as media
 				address which is passed to the RTP proxy.
 				</para></listitem>
 				<listitem><para>

+ 176 - 92
modules/rtpengine/rtpengine.c

@@ -301,7 +301,7 @@ static int rtpengine_manage1_f(struct sip_msg *, char *, char *);
 
 static int parse_flags(struct ng_flags_parse *, struct sip_msg *, enum rtpe_operation *, const char *);
 
-static int rtpengine_offer_answer(struct sip_msg *msg, const char *flags, int op);
+static int rtpengine_offer_answer(struct sip_msg *msg, const char *flags, int op, int more);
 static int fixup_set_id(void ** param, int param_no);
 static int set_rtpengine_set_f(struct sip_msg * msg, char * str1, char * str2);
 static struct rtpp_set * select_rtpp_set(int id_set);
@@ -339,9 +339,13 @@ static int rtpp_sets=0; /*used in rtpengine_set_store()*/
 static int rtpp_set_count = 0;
 static unsigned int current_msg_id = (unsigned int)-1;
 /* RTP proxy balancing list */
-struct rtpp_set_head * rtpp_set_list =0;
-struct rtpp_set * selected_rtpp_set =0;
-struct rtpp_set * default_rtpp_set=0;
+static struct rtpp_set_head * rtpp_set_list =0;
+static struct rtpp_set * active_rtpp_set =0;
+static struct rtpp_set * selected_rtpp_set_1 =0;
+static struct rtpp_set * selected_rtpp_set_2 =0;
+static struct rtpp_set * default_rtpp_set=0;
+
+static str body_intermediate;
 
 /* array with the sockets used by rtpporxy (per process)*/
 static unsigned int rtpp_no = 0;
@@ -367,6 +371,9 @@ static cmd_export_t cmds[] = {
 	{"set_rtpengine_set",  (cmd_function)set_rtpengine_set_f,    1,
 		fixup_set_id, 0,
 		ANY_ROUTE},
+	{"set_rtpengine_set",  (cmd_function)set_rtpengine_set_f,    2,
+		fixup_set_id, 0,
+		ANY_ROUTE},
 	{"start_recording",    (cmd_function)start_recording_f,      0,
 		0, 0,
 		ANY_ROUTE },
@@ -1337,6 +1344,7 @@ static bencode_item_t *rtpp_function_call(bencode_buffer_t *bencbuf, struct sip_
 	}
 	ng_flags.dict = bencode_dictionary(bencbuf);
 
+	body.s = NULL;
 	if (op == OP_OFFER || op == OP_ANSWER) {
 		ng_flags.flags = bencode_list(bencbuf);
 		ng_flags.direction = bencode_list(bencbuf);
@@ -1347,7 +1355,10 @@ static bencode_item_t *rtpp_function_call(bencode_buffer_t *bencbuf, struct sip_
 			LM_ERR("can't extract body from the message\n");
 			goto error;
 		}
-		bencode_dictionary_add_str(ng_flags.dict, "sdp", &body);
+		if (body_intermediate.s)
+			bencode_dictionary_add_str(ng_flags.dict, "sdp", &body_intermediate);
+		else
+			bencode_dictionary_add_str(ng_flags.dict, "sdp", &body);
 	}
 
 	/*** parse flags & build dictionary ***/
@@ -1420,7 +1431,7 @@ static bencode_item_t *rtpp_function_call(bencode_buffer_t *bencbuf, struct sip_
 	}
 
 	if(msg->id != current_msg_id)
-		selected_rtpp_set = default_rtpp_set;
+		active_rtpp_set = default_rtpp_set;
 
 	do {
 		node = select_rtpp_node(callid, 1);
@@ -1691,13 +1702,13 @@ select_rtpp_node(str callid, int do_test)
 	struct rtpp_node* node;
 	int was_forced;
 
-	if(!selected_rtpp_set){
+	if(!active_rtpp_set){
 		LM_ERR("script error -no valid set selected\n");
 		return NULL;
 	}
 	/* Most popular case: 1 proxy, nothing to calculate */
-	if (selected_rtpp_set->rtpp_node_count == 1) {
-		node = selected_rtpp_set->rn_first;
+	if (active_rtpp_set->rtpp_node_count == 1) {
+		node = active_rtpp_set->rn_first;
 		if (node->rn_disabled && node->rn_recheck_ticks <= get_ticks())
 			node->rn_disabled = rtpp_test(node, 1, 0);
 		return node->rn_disabled ? NULL : node;
@@ -1711,7 +1722,7 @@ select_rtpp_node(str callid, int do_test)
 	was_forced = 0;
 retry:
 	weight_sum = 0;
-	for (node=selected_rtpp_set->rn_first; node!=NULL; node=node->rn_next) {
+	for (node=active_rtpp_set->rn_first; node!=NULL; node=node->rn_next) {
 
 		if (node->rn_disabled && node->rn_recheck_ticks <= get_ticks()){
 			/* Try to enable if it's time to try. */
@@ -1725,7 +1736,7 @@ retry:
 		if (was_forced)
 			return NULL;
 		was_forced = 1;
-		for(node=selected_rtpp_set->rn_first; node!=NULL; node=node->rn_next) {
+		for(node=active_rtpp_set->rn_first; node!=NULL; node=node->rn_next) {
 			node->rn_disabled = rtpp_test(node, 1, 1);
 		}
 		goto retry;
@@ -1735,7 +1746,7 @@ retry:
 	 * sumcut here lays from 0 to weight_sum-1.
 	 * Scan proxy list and decrease until appropriate proxy is found.
 	 */
-	for (node=selected_rtpp_set->rn_first; node!=NULL; node=node->rn_next) {
+	for (node=active_rtpp_set->rn_first; node!=NULL; node=node->rn_next) {
 		if (node->rn_disabled)
 			continue;
 		if (sumcut < node->rn_weight)
@@ -1769,7 +1780,7 @@ get_extra_id(struct sip_msg* msg, str *id_str) {
 }
 
 static int
-set_rtpengine_set_from_avp(struct sip_msg *msg)
+set_rtpengine_set_from_avp(struct sip_msg *msg, int direction)
 {
     struct usr_avp *avp;
     int_str setid_val;
@@ -1777,15 +1788,21 @@ set_rtpengine_set_from_avp(struct sip_msg *msg)
     if ((setid_avp_param == NULL) ||
 	(avp = search_first_avp(setid_avp_type, setid_avp, &setid_val, 0))
 	== NULL)
-	return 1;
+    {
+	    if (direction == 1 || !selected_rtpp_set_2)
+		    active_rtpp_set = selected_rtpp_set_1;
+	    else
+		    active_rtpp_set = selected_rtpp_set_2;
+	    return 1;
+    }
 
     if (avp->flags&AVP_VAL_STR) {
 	LM_ERR("setid_avp must hold an integer value\n");
 	return -1;
     }
 
-    selected_rtpp_set = select_rtpp_set(setid_val.n);
-    if(selected_rtpp_set == NULL) {
+    active_rtpp_set = select_rtpp_set(setid_val.n);
+    if(active_rtpp_set == NULL) {
 	LM_ERR("could not locate rtpproxy set %d\n", setid_val.n);
 	return -1;
     }
@@ -1801,76 +1818,127 @@ static int rtpengine_delete(struct sip_msg *msg, const char *flags) {
 	return rtpp_function_call_simple(msg, OP_DELETE, flags);
 }
 
+static int rtpengine_rtpp_set_wrap(struct sip_msg *msg, int (*func)(struct sip_msg *msg, void *, int),
+		void *data, int direction)
+{
+	int ret, more;
+
+	body_intermediate.s = NULL;
+
+	if (set_rtpengine_set_from_avp(msg, direction) == -1)
+	    return -1;
+
+	more = 1;
+	if (!selected_rtpp_set_2 || selected_rtpp_set_2 == selected_rtpp_set_1)
+		more = 0;
+
+	ret = func(msg, data, more);
+	if (ret < 0)
+		return ret;
+
+	if (!more)
+		return ret;
+
+	direction = (direction == 1) ? 2 : 1;
+	if (set_rtpengine_set_from_avp(msg, direction) == -1)
+	    return -1;
+
+	ret = func(msg, data, 0);
+	body_intermediate.s = NULL;
+	return ret;
+}
+
+static int rtpengine_delete_wrap(struct sip_msg *msg, void *d, int more) {
+	return rtpengine_delete(msg, d);
+}
+
 static int
 rtpengine_delete1_f(struct sip_msg* msg, char* str1, char* str2)
 {
 	str flags;
 
-	if (set_rtpengine_set_from_avp(msg) == -1)
-	    return -1;
-
 	flags.s = NULL;
 	if (str1)
 		get_str_fparam(&flags, msg, (fparam_t *) str1);
 
-	return rtpengine_delete(msg, flags.s);
+	return rtpengine_rtpp_set_wrap(msg, rtpengine_delete_wrap, flags.s, 1);
 }
 
 /* This function assumes p points to a line of requested type. */
 
 static int
-set_rtpengine_set_f(struct sip_msg * msg, char * str1, char * str2)
+set_rtpengine_set_n(struct sip_msg *msg, rtpp_set_link_t *rtpl, struct rtpp_set **out)
 {
-	rtpp_set_link_t *rtpl;
 	pv_value_t val;
 	struct rtpp_node *node;
-	int nb_active_nodes;
-
-	rtpl = (rtpp_set_link_t*)str1;
-
-	current_msg_id = 0;
-	selected_rtpp_set = 0;
-	nb_active_nodes = 0;
+	int nb_active_nodes = 0;
 
 	if(rtpl->rset != NULL) {
 		current_msg_id = msg->id;
-		selected_rtpp_set = rtpl->rset;
-	} else {
-		if(pv_get_spec_value(msg, rtpl->rpv, &val)<0) {
-			LM_ERR("cannot evaluate pv param\n");
-			return -1;
-		}
-		if(!(val.flags & PV_VAL_INT)) {
-			LM_ERR("pv param must hold an integer value\n");
-			return -1;
-		}
-		selected_rtpp_set = select_rtpp_set(val.ri);
-		if(selected_rtpp_set==NULL) {
-			LM_ERR("could not locate rtpengine set %d\n", val.ri);
-			return -1;
-		}
-		current_msg_id = msg->id;
+		*out = rtpl->rset;
+		return 1;
+	}
 
-		node = selected_rtpp_set->rn_first;
-		while (node != NULL)
-		{
-		    if (node->rn_disabled == 0) nb_active_nodes++;
-		    node = node->rn_next;
-		}
+	if(pv_get_spec_value(msg, rtpl->rpv, &val)<0) {
+		LM_ERR("cannot evaluate pv param\n");
+		return -1;
+	}
+	if(!(val.flags & PV_VAL_INT)) {
+		LM_ERR("pv param must hold an integer value\n");
+		return -1;
+	}
+	*out = select_rtpp_set(val.ri);
+	if(*out==NULL) {
+		LM_ERR("could not locate rtpengine set %d\n", val.ri);
+		return -1;
+	}
+	current_msg_id = msg->id;
 
-		if ( nb_active_nodes > 0 )
-		{
-			LM_DBG("rtpp: selected proxy set ID %d with %d active nodes.\n",
-			       current_msg_id, nb_active_nodes);
-			return nb_active_nodes;
-		}
-		else
-		{
-			LM_WARN("rtpp: selected proxy set ID %d but it has no active node.\n",
-			         current_msg_id);
-			return -2;
-		}
+	node = (*out)->rn_first;
+	while (node != NULL)
+	{
+	    if (node->rn_disabled == 0) nb_active_nodes++;
+	    node = node->rn_next;
+	}
+
+	if ( nb_active_nodes > 0 )
+	{
+		LM_DBG("rtpp: selected proxy set ID %d with %d active nodes.\n",
+		       current_msg_id, nb_active_nodes);
+		return nb_active_nodes;
+	}
+	else
+	{
+		LM_WARN("rtpp: selected proxy set ID %d but it has no active node.\n",
+			 current_msg_id);
+		return -2;
+	}
+}
+
+static int
+set_rtpengine_set_f(struct sip_msg * msg, char * str1, char * str2)
+{
+	rtpp_set_link_t *rtpl1, *rtpl2;
+	int ret;
+
+	rtpl1 = (rtpp_set_link_t*)str1;
+	rtpl2 = (rtpp_set_link_t*)str2;
+
+	current_msg_id = 0;
+	active_rtpp_set = 0;
+	selected_rtpp_set_1 = 0;
+	selected_rtpp_set_2 = 0;
+
+	ret = set_rtpengine_set_n(msg, rtpl1, &selected_rtpp_set_1);
+	if (ret < 0)
+		return ret;
+
+	if (rtpl2) {
+		ret = set_rtpengine_set_n(msg, rtpl2, &selected_rtpp_set_2);
+		if (ret < 0)
+			return ret;
 	}
+
 	return 1;
 }
 
@@ -1903,9 +1971,9 @@ rtpengine_manage(struct sip_msg *msg, const char *flags)
 
 	if(msg->first_line.type == SIP_REQUEST) {
 		if(method==METHOD_ACK && nosdp==0)
-			return rtpengine_offer_answer(msg, flags, OP_ANSWER);
+			return rtpengine_offer_answer(msg, flags, OP_ANSWER, 0);
 		if(method==METHOD_UPDATE && nosdp==0)
-			return rtpengine_offer_answer(msg, flags, OP_OFFER);
+			return rtpengine_offer_answer(msg, flags, OP_OFFER, 0);
 		if(method==METHOD_INVITE && nosdp==0) {
 			msg->msg_flags |= FL_SDP_BODY;
 			if(tmb.t_gett!=NULL && tmb.t_gett()!=NULL
@@ -1913,38 +1981,43 @@ rtpengine_manage(struct sip_msg *msg, const char *flags)
 				tmb.t_gett()->uas.request->msg_flags |= FL_SDP_BODY;
 			if(route_type==FAILURE_ROUTE)
 				return rtpengine_delete(msg, flags);
-			return rtpengine_offer_answer(msg, flags, OP_OFFER);
+			return rtpengine_offer_answer(msg, flags, OP_OFFER, 0);
 		}
 	} else if(msg->first_line.type == SIP_REPLY) {
 		if(msg->first_line.u.reply.statuscode>=300)
 			return rtpengine_delete(msg, flags);
 		if(nosdp==0) {
 			if(method==METHOD_UPDATE)
-				return rtpengine_offer_answer(msg, flags, OP_ANSWER);
+				return rtpengine_offer_answer(msg, flags, OP_ANSWER, 0);
 			if(tmb.t_gett==NULL || tmb.t_gett()==NULL
 					|| tmb.t_gett()==T_UNDEFINED)
-				return rtpengine_offer_answer(msg, flags, OP_ANSWER);
+				return rtpengine_offer_answer(msg, flags, OP_ANSWER, 0);
 			if(tmb.t_gett()->uas.request->msg_flags & FL_SDP_BODY)
-				return rtpengine_offer_answer(msg, flags, OP_ANSWER);
-			return rtpengine_offer_answer(msg, flags, OP_OFFER);
+				return rtpengine_offer_answer(msg, flags, OP_ANSWER, 0);
+			return rtpengine_offer_answer(msg, flags, OP_OFFER, 0);
 		}
 	}
 	return -1;
 }
 
+static int rtpengine_manage_wrap(struct sip_msg *msg, void *d, int more) {
+	return rtpengine_manage(msg, d);
+}
+
 static int
 rtpengine_manage1_f(struct sip_msg *msg, char *str1, char *str2)
 {
 	str flags;
 
-	if (set_rtpengine_set_from_avp(msg) == -1)
-	    return -1;
-
 	flags.s = NULL;
 	if (str1)
 		get_str_fparam(&flags, msg, (fparam_t *) str1);
 
-	return rtpengine_manage(msg, flags.s);
+	return rtpengine_rtpp_set_wrap(msg, rtpengine_manage_wrap, flags.s, 1);
+}
+
+static int rtpengine_offer_wrap(struct sip_msg *msg, void *d, int more) {
+	return rtpengine_offer_answer(msg, d, OP_OFFER, more);
 }
 
 static int
@@ -1952,13 +2025,15 @@ rtpengine_offer1_f(struct sip_msg *msg, char *str1, char *str2)
 {
 	str flags;
 
-	if (set_rtpengine_set_from_avp(msg) == -1)
-	    return -1;
-
 	flags.s = NULL;
 	if (str1)
 		get_str_fparam(&flags, msg, (fparam_t *) str1);
-	return rtpengine_offer_answer(msg, flags.s, OP_OFFER);
+
+	return rtpengine_rtpp_set_wrap(msg, rtpengine_offer_wrap, flags.s, 1);
+}
+
+static int rtpengine_answer_wrap(struct sip_msg *msg, void *d, int more) {
+	return rtpengine_offer_answer(msg, d, OP_ANSWER, more);
 }
 
 static int
@@ -1966,9 +2041,6 @@ rtpengine_answer1_f(struct sip_msg *msg, char *str1, char *str2)
 {
 	str flags;
 
-	if (set_rtpengine_set_from_avp(msg) == -1)
-	    return -1;
-
 	if (msg->first_line.type == SIP_REQUEST)
 		if (msg->first_line.u.request.method_value != METHOD_ACK)
 			return -1;
@@ -1976,11 +2048,12 @@ rtpengine_answer1_f(struct sip_msg *msg, char *str1, char *str2)
 	flags.s = NULL;
 	if (str1)
 		get_str_fparam(&flags, msg, (fparam_t *) str1);
-	return rtpengine_offer_answer(msg, flags.s, OP_ANSWER);
+
+	return rtpengine_rtpp_set_wrap(msg, rtpengine_answer_wrap, flags.s, 2);
 }
 
 static int
-rtpengine_offer_answer(struct sip_msg *msg, const char *flags, int op)
+rtpengine_offer_answer(struct sip_msg *msg, const char *flags, int op, int more)
 {
 	bencode_buffer_t bencbuf;
 	bencode_item_t *dict;
@@ -1996,14 +2069,21 @@ rtpengine_offer_answer(struct sip_msg *msg, const char *flags, int op)
 		goto error;
 	}
 
-	anchor = del_lump(msg, body.s - msg->buf, body.len, 0);
-	if (!anchor) {
-		LM_ERR("del_lump failed\n");
-		goto error_free;
-	}
-	if (!insert_new_lump_after(anchor, newbody.s, newbody.len, 0)) {
-		LM_ERR("insert_new_lump_after failed\n");
-		goto error_free;
+	if (body_intermediate.s)
+		pkg_free(body_intermediate.s);
+
+	if (more)
+		body_intermediate = newbody;
+	else {
+		anchor = del_lump(msg, body.s - msg->buf, body.len, 0);
+		if (!anchor) {
+			LM_ERR("del_lump failed\n");
+			goto error_free;
+		}
+		if (!insert_new_lump_after(anchor, newbody.s, newbody.len, 0)) {
+			LM_ERR("insert_new_lump_after failed\n");
+			goto error_free;
+		}
 	}
 
 	bencode_buffer_free(&bencbuf);
@@ -2017,10 +2097,14 @@ error:
 }
 
 
+static int rtpengine_start_recording_wrap(struct sip_msg *msg, void *d, int more) {
+	return rtpp_function_call_simple(msg, OP_START_RECORDING, NULL);
+}
+
 static int
 start_recording_f(struct sip_msg* msg, char *foo, char *bar)
 {
-	return rtpp_function_call_simple(msg, OP_START_RECORDING, NULL);
+	return rtpengine_rtpp_set_wrap(msg, rtpengine_start_recording_wrap, NULL, 1);
 }
 
 /*