浏览代码

Always_flags enhacement to support asymetric RTP clients

Always_flags may allow to forward packets received from diffrent
ip:port than was introduced in SDP offer/answer.
Tomas Mandys 14 年之前
父节点
当前提交
1dd018ec16
共有 2 个文件被更改,包括 78 次插入33 次删除
  1. 13 3
      modules/iptrtpproxy/doc/iptrtpproxy.xml
  2. 65 30
      modules/iptrtpproxy/iptrtpproxy.c

+ 13 - 3
modules/iptrtpproxy/doc/iptrtpproxy.xml

@@ -38,7 +38,7 @@
 	<emphasis>libipt_RTPPROXY</emphasis> userspace library. 
 	See <ulink url="http://www.2p.cz/en/netfilter_rtp_proxy">http://www.2p.cz/en/netfilter_rtp_proxy</ulink>
 	All RTP streams are 
-	manipulated directly in kernel space, no data is copied from 
+	manipulated directly in kernel space, no data are copied from 
 	kernel to userspace and back, it reduces load and delay. 
 	See <ulink url="http://www.2p.cz/en/netfilter_rtp_proxy">
 	http://www.2p.cz/en/netfilter_rtp_proxy</ulink> for more details.
@@ -439,7 +439,8 @@ Aggregation will take <emphasis>sip-addr</emphasis> from the first switchboard o
 		between rights in codec set and <function>remove_codec_mask</function> is non zero then
 		such a codec are to be removed. The result may be obtained from
 		<function>@iptrtpproxy.auth_rights</function> which returns max. right which has been applied when
-		processing all codecs of enabled streams.
+		processing all codecs of enabled streams. <function>@iptrtpproxy.active_media_num</function> may
+		be used to get info if some media stream is to be throttled.
 		</para>
 		<para>
 		The function MUST NOT be called after <function>iptrtpproxy_alloc/update</function>! 
@@ -488,7 +489,7 @@ Aggregation will take <emphasis>sip-addr</emphasis> from the first switchboard o
 			<listitem>
 			<para>
 				Supported parameters: <emphasis>expiration_timeout</emphasis>, <emphasis>ttl</emphasis>, <emphasis>learning_timeout</emphasis>, 
-				<emphasis>always_learn</emphasis>,
+				<emphasis>always_flags</emphasis>,
 				<emphasis>aggregation_a</emphasis>, <emphasis>aggregation_b</emphasis>, 
 				<emphasis>switchboard_a</emphasis>, <emphasis>switchboard_b</emphasis>,
 				<emphasis>throttle_mark</emphasis>,
@@ -641,6 +642,15 @@ Aggregation will take <emphasis>sip-addr</emphasis> from the first switchboard o
 		</para>
 		</section>
 
+		<section id="iptrtpproxy.auth_throttling">
+		<title>
+			<function>@iptrtpproxy.auth_throttling</function>
+		</title>
+		<para>
+		Returns 1 if <function>iptrtpproxy_authorize_media</function> detected that some media stream is to be throttled.
+		In this case RTP proxy should be applied.
+		</para>
+		</section>
 	</section>
 
 </section>

+ 65 - 30
modules/iptrtpproxy/iptrtpproxy.c

@@ -179,13 +179,14 @@ static struct {
 	int learning_timeout;
 	int expiration_timeout;
 	int ttl;
-	int always_learn;
+	int always_flags;
 	struct {
 		u_int32_t mark;
 		struct xt_rtpproxy_throttle_stat bandwidth[2];
 	} throttle;
 	struct codec_set_item *codec_set;
 	int remove_codec_mask;
+	int auth_throttling;
 	unsigned int auth_rights;
 	struct ipt_session protected_sess;
 } global_params;
@@ -242,7 +243,7 @@ enum {
 	PAR_EXPIRATION_TIMEOUT, 
 	PAR_TTL, 
 	PAR_LEARNING_TIMEOUT, 
-	PAR_ALWAYS_LEARN,
+	PAR_ALWAYS_FLAGS,
 	PAR_AGGREGATION_A, PAR_AGGREGATION_B, 
 	PAR_SWITCHBOARD_A, PAR_SWITCHBOARD_B,
 	PAR_AGGREGATION_BY_SIP_IP_A, PAR_AGGREGATION_BY_SIP_IP_B,
@@ -253,7 +254,8 @@ enum {
 	PAR_THROTTLE_MARK, 
 	PAR_THROTTLE_RTP_MAX_BYTES, PAR_THROTTLE_RTP_MAX_PACKETS, 
 	PAR_THROTTLE_RTCP_MAX_BYTES, PAR_THROTTLE_RTCP_MAX_PACKETS,
-	PAR_CODEC_SET, PAR_AUTH_RIGHTS, PAR_REMOVE_CODEC_MASK
+	PAR_CODEC_SET, PAR_AUTH_RIGHTS, PAR_REMOVE_CODEC_MASK,
+	PAR_AUTH_THROTTLING
 };
 
 enum {
@@ -268,7 +270,7 @@ static struct {
 	{"expiration_timeout", PAR_EXPIRATION_TIMEOUT, PAR_READ|PAR_WRITE|PAR_INT},
 	{"ttl", PAR_TTL, PAR_READ|PAR_WRITE|PAR_INT},
 	{"learning_timeout", PAR_LEARNING_TIMEOUT, PAR_READ|PAR_WRITE|PAR_INT},
-	{"always_learn", PAR_ALWAYS_LEARN, PAR_READ|PAR_WRITE|PAR_INT},
+	{"always_flags", PAR_ALWAYS_FLAGS, PAR_READ|PAR_WRITE|PAR_INT},
 	{"aggregation_a", PAR_AGGREGATION_A, PAR_READ|PAR_WRITE|PAR_STR},
 	{"aggregation_b", PAR_AGGREGATION_B, PAR_READ|PAR_WRITE|PAR_STR|PAR_DIR},
 	{"switchboard_a", PAR_SWITCHBOARD_A, PAR_READ|PAR_WRITE|PAR_STR},
@@ -291,6 +293,7 @@ static struct {
 	{"codec_set", PAR_CODEC_SET, PAR_READ|PAR_WRITE|PAR_STR},
 	{"remove_codec_mask", PAR_REMOVE_CODEC_MASK, PAR_READ|PAR_WRITE|PAR_INT},
 	{"auth_rights", PAR_AUTH_RIGHTS, PAR_READ|PAR_INT},
+	{"auth_throttling", PAR_AUTH_THROTTLING, PAR_READ|PAR_WRITE|PAR_INT},
 	{ NULL }
 };
 
@@ -462,6 +465,7 @@ struct sdp_codec {
 struct sdp_session {
 	str oline_user_s;
 	str oline_addr_s;
+	str cline_ip_s;
 	unsigned int media_count;
 	struct {
 		int active;  /* if SDP has been parsed correctly, has a IP (even 0.0.0.0), port!=0 and has supported params */
@@ -544,7 +548,7 @@ static int prefix2enum(str *line, str (*list)[]) {
 /* SDP RFC2327 */
 static int parse_sdp_content(struct sip_msg* msg, struct sdp_session *sess) {
 	char *p, *pend, *cp, *cp2, *lend;
-	str line, lvalue, cline_ip_s, body;
+	str line, lvalue, body;
 	int sess_fl, i, cline_count, codec_count;
 	char ltype, savec;
 	unsigned int cline_ip;
@@ -625,8 +629,8 @@ static int parse_sdp_content(struct sip_msg* msg, struct sdp_session *sess) {
 	pend = body.s + body.len;
 	sess_fl = 0;
 	sess->media_count = 0;
-	cline_ip_s.s = NULL;  /* make gcc happy */
-	cline_ip_s.len = 0;
+	sess->cline_ip_s.s = NULL;  /* make gcc happy */
+	sess->cline_ip_s.len = 0;
 	cline_ip = 0;
 	cline_count = 0;
 	codec_count = 0;
@@ -688,7 +692,8 @@ static int parse_sdp_content(struct sip_msg* msg, struct sdp_session *sess) {
 								goto invalidate;
 							}
 							else {
-								cline_ip_s.len = 0;
+								sess->cline_ip_s.len = 0;
+								sess->cline_ip_s.s = NULL;
 							}
 							break;
 						}
@@ -711,7 +716,7 @@ static int parse_sdp_content(struct sip_msg* msg, struct sdp_session *sess) {
 							goto invalidate;
 						}
 						if (sess_fl == 1) {
-							cline_ip_s = lvalue;
+							sess->cline_ip_s = lvalue;
 							cline_ip = s2ip4(&lvalue);
 						}
 						else {
@@ -781,8 +786,8 @@ static int parse_sdp_content(struct sip_msg* msg, struct sdp_session *sess) {
 						cp = eat_token_end(cp, lend);
 						lvalue.len = cp-lvalue.s;
 						if (name2enum(&lvalue, &supported_protocols) >= 0) {
-							sess->media[sess->media_count-1].active = cline_ip_s.len != 0;  /* IP may by specified by hostname */
-							sess->media[sess->media_count-1].ip_s = cline_ip_s;
+							sess->media[sess->media_count-1].active = sess->cline_ip_s.len != 0;  /* IP may by specified by hostname */
+							sess->media[sess->media_count-1].ip_s = sess->cline_ip_s;
 							sess->media[sess->media_count-1].ip = cline_ip;
 						}
 						/* get payload types */
@@ -826,7 +831,7 @@ static int parse_sdp_content(struct sip_msg* msg, struct sdp_session *sess) {
 				break;
 			case 'a':
 				if (sess_fl == 2 && sess->media[sess->media_count-1].port == 0) {
-					 break;  /* ignore parameters of disabled media section */
+					break;  /* ignore parameters of disabled media section */
 				}
 				i = name2enum(&lvalue, &send_rec_modifiers);
 				if (i > 0) {
@@ -1000,15 +1005,17 @@ static int update_sdp_content(struct sip_msg* msg, int gate_a_to_b, struct sdp_s
 	/* do topo hiding if all media are disabled for c= line then set address 0.0.0.0 to hide UA location */
 	for (i=0; i<sdp_sess->media_count; i++) {
 		if (sdp_sess->media[i].ip && (!sdp_sess->media[i].active || ipt_sess->sdp_media[i] < 0)) { /* not affected but previous loop */
-			for (j=0; j<i; j++) {
-				if (sdp_sess->media[i].ip_s.s == sdp_sess->media[j].ip_s.s) {
-					goto cline_fixed2;  /* must be already updated */
+			if (sdp_sess->media[i].ip_s.s == sdp_sess->cline_ip_s.s) {  /* the c= may be shared */
+				for (j=0; j<i; j++) {
+					if (sdp_sess->media[i].ip_s.s == sdp_sess->media[j].ip_s.s) {
+						goto cline_fixed2;  /* must be already updated */
+					}
 				}
-			}
-			for (j=i+1; j<sdp_sess->media_count; j++) {
-				if (sdp_sess->media[i].ip_s.s == sdp_sess->media[j].ip_s.s && /* same c= line */
-				    sdp_sess->media[i].active && ipt_sess->sdp_media[i] >= 0) {  /* has media enabled */
-					goto cline_fixed2;
+				for (j=i+1; j<sdp_sess->media_count; j++) {
+					if (sdp_sess->media[i].ip_s.s == sdp_sess->media[j].ip_s.s && /* same c= line */
+						sdp_sess->media[i].active && ipt_sess->sdp_media[i] >= 0) {  /* has media enabled */
+						goto cline_fixed2;
+					}
 				}
 			}
 			/* apply lump for ip address in c= line */
@@ -1019,6 +1026,20 @@ static int update_sdp_content(struct sip_msg* msg, int gate_a_to_b, struct sdp_s
 			;
 		}
 	}
+	/* do topo hiding of session c= lines which are not used in any media section */
+	if (sdp_sess->cline_ip_s.len > 0) {		
+		for (i=0; i<sdp_sess->media_count; i++) {
+			if (sdp_sess->media[i].ip && sdp_sess->media[i].ip_s.s == sdp_sess->cline_ip_s.s) {
+				goto cline_fixed3;
+			}
+		}
+		/* apply lump for ip address in c= line */
+		ip42s(0, &s);
+		if (prepare_lumps(msg, &sdp_sess->cline_ip_s, &s) < 0)
+			return -1;		
+	cline_fixed3:
+		;
+	}
 
 
 	if (sdp_sess->oline_addr_s.s) {  /* o= line exists */
@@ -1214,14 +1235,14 @@ inline static void fill_in_session(int flags, int media_idx, struct sdp_session
 			in_session->dir[GATE_A_TO_B(flags)].stream[j].source.ip = sdp_sess->media[media_idx].ip;
 			in_session->dir[GATE_A_TO_B(flags)].stream[j].source.port = sdp_sess->media[media_idx].port+j;
 		}
-		if (global_params.learning_timeout > 0) {
+		if (global_params.learning_timeout >= 0) {
 			in_session->dir[GATE_A_TO_B(flags)].stream[j].flags |= XT_RTPPROXY_SOCKOPT_FLAG_SESSION_LEARNING_TIMEOUT;
 			in_session->dir[GATE_A_TO_B(flags)].stream[j].learning_timeout = global_params.learning_timeout; 
 		}
 	}
-	if (global_params.always_learn >= 0) {
-		in_session->dir[GATE_A_TO_B(flags)].always_learn = global_params.always_learn!=0;
-		in_session->dir[GATE_A_TO_B(flags)].flags |= XT_RTPPROXY_SOCKOPT_FLAG_ALWAYS_LEARN;
+	if (global_params.always_flags >= 0) {
+		in_session->dir[GATE_A_TO_B(flags)].always_flags = global_params.always_flags;
+		in_session->dir[GATE_A_TO_B(flags)].flags |= XT_RTPPROXY_SOCKOPT_FLAG_ALWAYS_FLAGS;
 	}
 	if (global_params.expiration_timeout > 0) {
 		in_session->sh.expires_timeout = global_params.expiration_timeout;
@@ -1272,7 +1293,7 @@ static int rtpproxy_alloc(struct sip_msg* msg, char* _flags, char* _dummy) {
 	}
 	if (check_parse_sdp_content(msg, &global_sdp_sess) < 0) return -1;
 
-ERR("RTPPROXY_DEBUG: sdp.media_count: %d, flags: %d\n", global_sdp_sess.media_count, flags);
+ERR("RTPPROXY_DEBUG: flags: %d\n", flags);
 	if (global_params.protected_sess.switchboard) {  /* any protected ? */
 		/* get session source address from kernel module and compare with SDP content */
 		for (i = 0; i < global_params.protected_sess.session_count; i++) {
@@ -1804,6 +1825,12 @@ static int rtpproxy_authorize_media(struct sip_msg* msg, char* _dummy1, char* _d
 		if (fl) {
 			goto remove_stream;
 		}
+		for (j=0; j<=1 && !global_params.auth_throttling; j++) {
+			if (global_params.codec_set->media_types[global_sdp_sess.media[i].media_type].throttle.bandwidth[j].bytes > 0 ||
+				global_params.codec_set->media_types[global_sdp_sess.media[i].media_type].throttle.bandwidth[j].packets > 0) {
+				global_params.auth_throttling = 1;				
+			}
+		}
 		for (j=0; j<global_sdp_sess.media[i].codec_count; j++) { 
 			unsigned int r;
 			struct sdp_codec *c;
@@ -1873,8 +1900,8 @@ static int rtpproxy_set_param(struct sip_msg* msg, char* _idx, char* _value) {
 		case PAR_LEARNING_TIMEOUT:
 			global_params.learning_timeout = u.i;
 			break;
-		case PAR_ALWAYS_LEARN:
-			global_params.always_learn = u.i;
+		case PAR_ALWAYS_FLAGS:
+			global_params.always_flags = u.i;
 			break;
 		case PAR_SWITCHBOARD_A:
 		case PAR_SWITCHBOARD_B:
@@ -1932,6 +1959,10 @@ static int rtpproxy_set_param(struct sip_msg* msg, char* _idx, char* _value) {
 			global_params.remove_codec_mask = u.i;
 			break;
 
+		case PAR_AUTH_THROTTLING:
+			global_params.auth_throttling = u.i;
+			break;
+
 		case PAR_OLINE_USER: {
 				static char buf[30];
 				if (u.s.len > sizeof(buf)) {
@@ -2001,8 +2032,8 @@ static int sel_rtpproxy(str* res, select_t* s, struct sip_msg* msg) {  /* dummy
 		case PAR_LEARNING_TIMEOUT:
 			u.i = global_params.learning_timeout;
 			break;
-		case PAR_ALWAYS_LEARN:
-			u.i = global_params.always_learn;
+		case PAR_ALWAYS_FLAGS:
+			u.i = global_params.always_flags;
 			break;
 		case PAR_SWITCHBOARD_A:
 		case PAR_SWITCHBOARD_B:
@@ -2060,6 +2091,9 @@ static int sel_rtpproxy(str* res, select_t* s, struct sip_msg* msg) {  /* dummy
 		case PAR_REMOVE_CODEC_MASK:
 			u.i = global_params.remove_codec_mask;
 			break;
+		case PAR_AUTH_THROTTLING:
+			u.i = global_params.auth_throttling;
+			break;
 		default:
 			;
 	}
@@ -2080,7 +2114,8 @@ select_row_t sel_declaration[] = {
 
 static int mod_pre_script_cb(struct sip_msg *msg, unsigned int flags, void *param) {
 	memset(&global_params, 0, sizeof(global_params));
-	global_params.always_learn = -1;
+	global_params.always_flags = -1;
+	global_params.learning_timeout = -1;
 	global_params.ttl = -1;
 	sdp_parsed = 999; /* any number not in (-1,0) */
 	return 1;