Pārlūkot izejas kodu

rtpengine: consume MOS values reported back by RTP proxy

Richard Fuchs 8 gadi atpakaļ
vecāks
revīzija
1aa2e58fb5
2 mainītis faili ar 1683 papildinājumiem un 17 dzēšanām
  1. 1266 1
      src/modules/rtpengine/doc/rtpengine_admin.xml
  2. 417 16
      src/modules/rtpengine/rtpengine.c

Failā izmaiņas netiks attēlotas, jo tās ir par lielu
+ 1266 - 1
src/modules/rtpengine/doc/rtpengine_admin.xml


+ 417 - 16
src/modules/rtpengine/rtpengine.c

@@ -128,6 +128,41 @@ static const char *command_strings[] = {
 	[OP_PING]		= "ping",
 };
 
+struct minmax_mos_stats {
+	str mos_param;
+	str at_param;
+	str packetloss_param;
+	str jitter_param;
+	str roundtrip_param;
+	str samples_param;
+
+	pv_elem_t *mos_pv;
+	pv_elem_t *at_pv;
+	pv_elem_t *packetloss_pv;
+	pv_elem_t *jitter_pv;
+	pv_elem_t *roundtrip_pv;
+	pv_elem_t *samples_pv;
+};
+struct minmax_mos_label_stats {
+	int got_any_pvs;
+
+	str label_param;
+	pv_elem_t *label_pv;
+
+	struct minmax_mos_stats min,
+				max,
+				average;
+};
+struct minmax_stats_vals {
+	long long mos;
+	long long at;
+	long long packetloss;
+	long long jitter;
+	long long roundtrip;
+	long long samples;
+	long long avg_samples; /* our own running count to average the averages */
+};
+
 static char *gencookie();
 static int rtpp_test(struct rtpp_node*, int, int);
 static int start_recording_f(struct sip_msg *, char *, char *);
@@ -135,6 +170,7 @@ static int rtpengine_answer1_f(struct sip_msg *, char *, char *);
 static int rtpengine_offer1_f(struct sip_msg *, char *, char *);
 static int rtpengine_delete1_f(struct sip_msg *, char *, char *);
 static int rtpengine_manage1_f(struct sip_msg *, char *, char *);
+static int rtpengine_query1_f(struct sip_msg *, char *, char *);
 
 static int parse_flags(struct ng_flags_parse *, struct sip_msg *, enum rtpe_operation *, const char *);
 
@@ -167,6 +203,9 @@ static int rtpp_test_ping(struct rtpp_node *node);
 /* Pseudo-Variables */
 static int pv_get_rtpstat_f(struct sip_msg *, pv_param_t *, pv_value_t *);
 static int set_rtp_inst_pvar(struct sip_msg *msg, const str * const uri);
+static int pv_parse_var(str *inp, pv_elem_t **outp, int *got_any);
+static int mos_label_stats_parse(struct minmax_mos_label_stats *mmls);
+static void parse_call_stats(bencode_item_t *, struct sip_msg *);
 
 static int rtpengine_allow_op = 0;
 static struct rtpp_node **queried_nodes_ptr = NULL;
@@ -223,11 +262,16 @@ typedef struct rtpp_set_link {
 /* tm */
 static struct tm_binds tmb;
 
-/*0-> disabled, 1 ->enabled*/
-unsigned int *natping_state=0;
-
 static pv_elem_t *extra_id_pv = NULL;
 
+
+static struct minmax_mos_label_stats global_mos_stats,
+				     side_A_mos_stats,
+				     side_B_mos_stats;
+int got_any_mos_pvs;
+
+
+
 static cmd_export_t cmds[] = {
 	{"set_rtpengine_set",	(cmd_function)set_rtpengine_set_f,	1,
 		fixup_set_id, 0,
@@ -262,6 +306,12 @@ static cmd_export_t cmds[] = {
 	{"rtpengine_delete",	(cmd_function)rtpengine_delete1_f,	1,
 		fixup_spve_null, 0,
 		ANY_ROUTE},
+	{"rtpengine_query",	(cmd_function)rtpengine_query1_f,	0,
+		0, 0,
+		ANY_ROUTE},
+	{"rtpengine_query",	(cmd_function)rtpengine_query1_f,	1,
+		fixup_spve_null, 0,
+		ANY_ROUTE},
 	{0, 0, 0, 0, 0, 0}
 };
 
@@ -294,6 +344,61 @@ static param_export_t params[] = {
 	{"hash_table_tout",       INT_PARAM, &hash_table_tout        },
 	{"hash_table_size",       INT_PARAM, &hash_table_size        },
 	{"setid_default",         INT_PARAM, &setid_default          },
+
+	/* MOS stats output */
+	/* global averages */
+	{"mos_min_pv",                PARAM_STR, &global_mos_stats.min.mos_param             },
+	{"mos_min_at_pv",             PARAM_STR, &global_mos_stats.min.at_param              },
+	{"mos_min_packetloss_pv",     PARAM_STR, &global_mos_stats.min.packetloss_param      },
+	{"mos_min_jitter_pv",         PARAM_STR, &global_mos_stats.min.jitter_param          },
+	{"mos_min_roundtrip_pv",      PARAM_STR, &global_mos_stats.min.roundtrip_param       },
+	{"mos_max_pv",                PARAM_STR, &global_mos_stats.max.mos_param             },
+	{"mos_max_at_pv",             PARAM_STR, &global_mos_stats.max.at_param              },
+	{"mos_max_packetloss_pv",     PARAM_STR, &global_mos_stats.max.packetloss_param      },
+	{"mos_max_jitter_pv",         PARAM_STR, &global_mos_stats.max.jitter_param          },
+	{"mos_max_roundtrip_pv",      PARAM_STR, &global_mos_stats.max.roundtrip_param       },
+	{"mos_average_pv",            PARAM_STR, &global_mos_stats.average.mos_param         },
+	{"mos_average_packetloss_pv", PARAM_STR, &global_mos_stats.average.packetloss_param  },
+	{"mos_average_jitter_pv",     PARAM_STR, &global_mos_stats.average.jitter_param      },
+	{"mos_average_roundtrip_pv",  PARAM_STR, &global_mos_stats.average.roundtrip_param   },
+	{"mos_average_samples_pv",    PARAM_STR, &global_mos_stats.average.samples_param     },
+
+	/* designated side A */
+	{"mos_A_label_pv",              PARAM_STR, &side_A_mos_stats.label_param               },
+	{"mos_min_A_pv",                PARAM_STR, &side_A_mos_stats.min.mos_param             },
+	{"mos_min_at_A_pv",             PARAM_STR, &side_A_mos_stats.min.at_param              },
+	{"mos_min_packetloss_A_pv",     PARAM_STR, &side_A_mos_stats.min.packetloss_param      },
+	{"mos_min_jitter_A_pv",         PARAM_STR, &side_A_mos_stats.min.jitter_param          },
+	{"mos_min_roundtrip_A_pv",      PARAM_STR, &side_A_mos_stats.min.roundtrip_param       },
+	{"mos_max_A_pv",                PARAM_STR, &side_A_mos_stats.max.mos_param             },
+	{"mos_max_at_A_pv",             PARAM_STR, &side_A_mos_stats.max.at_param              },
+	{"mos_max_packetloss_A_pv",     PARAM_STR, &side_A_mos_stats.max.packetloss_param      },
+	{"mos_max_jitter_A_pv",         PARAM_STR, &side_A_mos_stats.max.jitter_param          },
+	{"mos_max_roundtrip_A_pv",      PARAM_STR, &side_A_mos_stats.max.roundtrip_param       },
+	{"mos_average_A_pv",            PARAM_STR, &side_A_mos_stats.average.mos_param         },
+	{"mos_average_packetloss_A_pv", PARAM_STR, &side_A_mos_stats.average.packetloss_param  },
+	{"mos_average_jitter_A_pv",     PARAM_STR, &side_A_mos_stats.average.jitter_param      },
+	{"mos_average_roundtrip_A_pv",  PARAM_STR, &side_A_mos_stats.average.roundtrip_param   },
+	{"mos_average_samples_A_pv",    PARAM_STR, &side_A_mos_stats.average.samples_param     },
+
+	/* designated side B */
+	{"mos_B_label_pv",              PARAM_STR, &side_B_mos_stats.label_param               },
+	{"mos_min_B_pv",                PARAM_STR, &side_B_mos_stats.min.mos_param             },
+	{"mos_min_at_B_pv",             PARAM_STR, &side_B_mos_stats.min.at_param              },
+	{"mos_min_packetloss_B_pv",     PARAM_STR, &side_B_mos_stats.min.packetloss_param      },
+	{"mos_min_jitter_B_pv",         PARAM_STR, &side_B_mos_stats.min.jitter_param          },
+	{"mos_min_roundtrip_B_pv",      PARAM_STR, &side_B_mos_stats.min.roundtrip_param       },
+	{"mos_max_B_pv",                PARAM_STR, &side_B_mos_stats.max.mos_param             },
+	{"mos_max_at_B_pv",             PARAM_STR, &side_B_mos_stats.max.at_param              },
+	{"mos_max_packetloss_B_pv",     PARAM_STR, &side_B_mos_stats.max.packetloss_param      },
+	{"mos_max_jitter_B_pv",         PARAM_STR, &side_B_mos_stats.max.jitter_param          },
+	{"mos_max_roundtrip_B_pv",      PARAM_STR, &side_B_mos_stats.max.roundtrip_param       },
+	{"mos_average_B_pv",            PARAM_STR, &side_B_mos_stats.average.mos_param         },
+	{"mos_average_packetloss_B_pv", PARAM_STR, &side_B_mos_stats.average.packetloss_param  },
+	{"mos_average_jitter_B_pv",     PARAM_STR, &side_B_mos_stats.average.jitter_param      },
+	{"mos_average_roundtrip_B_pv",  PARAM_STR, &side_B_mos_stats.average.roundtrip_param   },
+	{"mos_average_samples_B_pv",    PARAM_STR, &side_B_mos_stats.average.samples_param     },
+
 	{0, 0, 0}
 };
 
@@ -1371,15 +1476,15 @@ mod_init(void)
 		}
 	}
 
-	if (extra_id_pv_param.s && *extra_id_pv_param.s) {
-		extra_id_pv_param.len = strlen(extra_id_pv_param.s);
-		if(pv_parse_format(&extra_id_pv_param, &extra_id_pv) < 0) {
-			LM_ERR("malformed PV string: %s\n", extra_id_pv_param.s);
-			return -1;
-		}
-	} else {
-		extra_id_pv = NULL;
-	}
+	if (pv_parse_var(&extra_id_pv_param, &extra_id_pv, NULL))
+		return -1;
+
+	if (mos_label_stats_parse(&global_mos_stats))
+		return -1;
+	if (mos_label_stats_parse(&side_A_mos_stats))
+		return -1;
+	if (mos_label_stats_parse(&side_B_mos_stats))
+		return -1;
 
 	if (setid_avp_param) {
 		s.s = setid_avp_param; s.len = strlen(s.s);
@@ -1578,6 +1683,55 @@ rptest:
 	return 0;
 }
 
+static int pv_parse_var(str *inp, pv_elem_t **outp, int *got_any) {
+	if (inp->s && *inp->s) {
+		inp->len = strlen(inp->s);
+		if(pv_parse_format(inp, outp) < 0) {
+			LM_ERR("malformed PV string: %s\n", inp->s);
+			return -1;
+		}
+		if (got_any)
+			*got_any = 1;
+	} else {
+		*outp = NULL;
+	}
+	return 0;
+}
+
+static int minmax_pv_parse(struct minmax_mos_stats *s, int *got_any) {
+	if (pv_parse_var(&s->mos_param, &s->mos_pv, got_any))
+		return -1;
+	if (pv_parse_var(&s->at_param, &s->at_pv, got_any))
+		return -1;
+	if (pv_parse_var(&s->packetloss_param, &s->packetloss_pv, got_any))
+		return -1;
+	if (pv_parse_var(&s->jitter_param, &s->jitter_pv, got_any))
+		return -1;
+	if (pv_parse_var(&s->roundtrip_param, &s->roundtrip_pv, got_any))
+		return -1;
+	if (pv_parse_var(&s->samples_param, &s->samples_pv, got_any))
+		return -1;
+	return 0;
+}
+
+static int mos_label_stats_parse(struct minmax_mos_label_stats *mmls) {
+	if (pv_parse_var(&mmls->label_param, &mmls->label_pv, &mmls->got_any_pvs))
+		return -1;
+
+	if (minmax_pv_parse(&mmls->min, &mmls->got_any_pvs))
+		return -1;
+	if (minmax_pv_parse(&mmls->max, &mmls->got_any_pvs))
+		return -1;
+	if (minmax_pv_parse(&mmls->average, &mmls->got_any_pvs))
+		return -1;
+
+	if (mmls->got_any_pvs)
+		got_any_mos_pvs = 1;
+
+	return 0;
+}
+
+
 static int
 child_init(int rank)
 {
@@ -1613,9 +1767,6 @@ static void mod_destroy(void)
 	struct rtpp_node * crt_rtpp, *last_rtpp;
 
 	/*free the shared memory*/
-	if (natping_state)
-		shm_free(natping_state);
-
 	if (rtpp_no) {
 		shm_free(rtpp_no);
 		rtpp_no = NULL;
@@ -2628,8 +2779,241 @@ set_rtpengine_set_from_avp(struct sip_msg *msg, int direction)
 	return 1;
 }
 
+static void avp_print_s(pv_elem_t *pv, char *str, int len, struct sip_msg *msg) {
+	pv_value_t val;
+
+	if (!pv)
+		return;
+
+	memset(&val, 0, sizeof(val));
+	val.flags = PV_VAL_STR;
+	val.rs.s = str;
+	val.rs.len = len;
+	pv->spec->setf(msg, &pv->spec->pvp, EQ_T, &val);
+}
+
+static void avp_print_decimal(pv_elem_t *pv, int num, struct sip_msg *msg) {
+	int len;
+	char buf[8];
+
+	len = snprintf(buf, sizeof(buf), "%i.%i", num / 10, abs(num % 10));
+	avp_print_s(pv, buf, len, msg);
+}
+static void avp_print_int(pv_elem_t *pv, int num, struct sip_msg *msg) {
+	int len;
+	char buf[8];
+
+	len = snprintf(buf, sizeof(buf), "%i", num);
+	avp_print_s(pv, buf, len, msg);
+}
+static void avp_print_time(pv_elem_t *pv, int num, struct sip_msg *msg) {
+	int len;
+	char buf[8];
+
+	len = snprintf(buf, sizeof(buf), "%i:%02i", num / 60, abs(num % 60));
+	avp_print_s(pv, buf, len, msg);
+}
+
+static void avp_print_mos(struct minmax_mos_stats *s, struct minmax_stats_vals *vals, long long created,
+		struct sip_msg *msg)
+{
+	if (!vals->avg_samples)
+		return;
+
+	avp_print_decimal(s->mos_pv, vals->mos / vals->avg_samples, msg);
+	avp_print_time(s->at_pv, vals->at - created, msg);
+	avp_print_int(s->packetloss_pv, vals->packetloss / vals->avg_samples, msg);
+	avp_print_int(s->jitter_pv, vals->jitter / vals->avg_samples, msg);
+	avp_print_int(s->roundtrip_pv, vals->roundtrip / vals->avg_samples, msg);
+	avp_print_int(s->samples_pv, vals->samples / vals->avg_samples, msg);
+}
+
+static int decode_mos_vals_dict(struct minmax_stats_vals *vals, bencode_item_t *dict, const char *key) {
+	bencode_item_t *mos_ent;
+
+	mos_ent = bencode_dictionary_get_expect(dict, key, BENCODE_DICTIONARY);
+	if (!mos_ent)
+		return 0;
+
+	vals->mos = bencode_dictionary_get_integer(mos_ent, "MOS", -1);
+	vals->at = bencode_dictionary_get_integer(mos_ent, "reported at", -1);
+	vals->packetloss = bencode_dictionary_get_integer(mos_ent, "packet loss", -1);
+	vals->jitter = bencode_dictionary_get_integer(mos_ent, "jitter", -1);
+	vals->roundtrip = bencode_dictionary_get_integer(mos_ent, "round-trip time", -1);
+	vals->samples = bencode_dictionary_get_integer(mos_ent, "samples", -1);
+	vals->avg_samples = 1;
+
+	return 1;
+}
+
+static void parse_call_stats_1(struct minmax_mos_label_stats *mmls, bencode_item_t *dict,
+		struct sip_msg *msg)
+{
+	long long created;
+	str label, check;
+	long long ssrcs[4];
+	unsigned int num_ssrcs = 0, i;
+	long long ssrc;
+	char *endp;
+	bencode_item_t *ssrc_list,
+		       *ssrc_key,
+		       *ssrc_dict,
+		       *tags,
+		       *tag_key,
+		       *tag_dict,
+		       *medias,
+		       *media,
+		       *streams,
+		       *stream;
+	struct minmax_stats_vals min_vals = { .mos = 100 },
+				 max_vals = { .mos = -1  },
+				 average_vals = { .avg_samples = 0 },
+				 vals_decoded;
+
+	if (!mmls->got_any_pvs)
+		return;
+
+	/* check if only a subset of info is requested */
+	if (!mmls->label_pv)
+		goto ssrcs_done;
+
+	if (pv_printf_s(msg, mmls->label_pv, &label)) {
+		LM_ERR("error printing label PV\n");
+		return;
+	}
+	LM_DBG("rtpengine: looking for label '%.*s'\n", label.len, label.s);
+
+	/* walk through tags to find the label we're looking for */
+	tags = bencode_dictionary_get_expect(dict, "tags", BENCODE_DICTIONARY);
+	if (!tags)
+		return; /* label wanted but no tags found - return nothing */
+	LM_DBG("rtpengine: XXX got tags\n");
+
+	for (tag_key = tags->child; tag_key; tag_key = tag_key->sibling->sibling) {
+		LM_DBG("rtpengine: XXX got tag\n");
+		tag_dict = tag_key->sibling;
+		/* compare label */
+		if (!bencode_dictionary_get_str(tag_dict, "label", &check))
+			continue;
+		LM_DBG("rtpengine: XXX got label %.*s\n", check.len, check.s);
+		if (str_cmp(&check, &label))
+			continue;
+		LM_DBG("rtpengine: XXX label match\n");
+		medias = bencode_dictionary_get_expect(tag_dict, "medias", BENCODE_LIST);
+		if (!medias)
+			continue;
+		LM_DBG("rtpengine: XXX got medias\n");
+		for (media = medias->child; media; media = media->sibling) {
+			LM_DBG("rtpengine: XXX got media\n");
+			streams = bencode_dictionary_get_expect(media, "streams", BENCODE_LIST);
+			if (!streams)
+				continue;
+			LM_DBG("rtpengine: XXX got streams\n");
+			/* only check the first stream (RTP) */
+			stream = streams->child;
+			if (!stream)
+				continue;
+			LM_DBG("rtpengine: XXX got stream type %i\n", stream->type);
+			LM_DBG("rtpengine: XXX stream child '%.*s'\n", (int) stream->child->iov[1].iov_len, (char *) stream->child->iov[1].iov_base);
+			LM_DBG("rtpengine: XXX stream child val type %i\n", stream->child->sibling->type);
+			if ((ssrc = bencode_dictionary_get_integer(stream, "SSRC", -1)) == -1)
+				continue;
+			/* got a valid SSRC to watch for */
+			ssrcs[num_ssrcs] = ssrc;
+			LM_DBG("rtpengine: found SSRC '%lli' for label '%.*s'\n",
+					ssrc,
+					label.len, label.s);
+			num_ssrcs++;
+			/* see if we can do more */
+			if (num_ssrcs >= (sizeof(ssrcs) / sizeof(*ssrcs)))
+				goto ssrcs_done;
+		}
+	}
+	/* if we get here, we were looking for label. see if we found one. if not, return nothing */
+	if (num_ssrcs == 0)
+		return;
+
+ssrcs_done:
+	/* now look for the stats values */
+	created = bencode_dictionary_get_integer(dict, "created", 0);
+	ssrc_list = bencode_dictionary_get_expect(dict, "SSRC", BENCODE_DICTIONARY);
+	if (!ssrc_list)
+		return;
+
+	for (ssrc_key = ssrc_list->child; ssrc_key; ssrc_key = ssrc_key->sibling->sibling) {
+		/* see if this is a SSRC we're interested in */
+		if (num_ssrcs == 0)
+			goto ssrc_ok;
+		if (!bencode_get_str(ssrc_key, &check))
+			continue;
+		ssrc = strtoll(check.s, &endp, 10);
+		for (i = 0; i < num_ssrcs; i++) {
+			if (ssrcs[i] != ssrc)
+				continue;
+			/* it's a match */
+			LM_DBG("rtpengine: considering SSRC '%.*s'\n",
+					check.len, check.s);
+			goto ssrc_ok;
+		}
+		/* no match */
+		continue;
+
+ssrc_ok:
+		ssrc_dict = ssrc_key->sibling;
+		if (!ssrc_dict)
+			continue;
+
+		if (decode_mos_vals_dict(&vals_decoded, ssrc_dict, "average MOS")) {
+			average_vals.avg_samples++;
+			average_vals.mos += vals_decoded.mos;
+			average_vals.packetloss += vals_decoded.packetloss;
+			average_vals.jitter += vals_decoded.jitter;
+			average_vals.roundtrip += vals_decoded.roundtrip;
+			average_vals.samples += vals_decoded.samples;
+		}
+
+		if (decode_mos_vals_dict(&vals_decoded, ssrc_dict, "highest MOS")) {
+			if (vals_decoded.mos > max_vals.mos)
+				max_vals = vals_decoded;
+		}
+		if (decode_mos_vals_dict(&vals_decoded, ssrc_dict, "lowest MOS")) {
+			if (vals_decoded.mos < min_vals.mos)
+				min_vals = vals_decoded;
+		}
+	}
+
+	avp_print_mos(&mmls->max, &max_vals, created, msg);
+	avp_print_mos(&mmls->min, &min_vals, created, msg);
+	avp_print_mos(&mmls->average, &average_vals, created, msg);
+}
+
+static void parse_call_stats(bencode_item_t *dict, struct sip_msg *msg) {
+	if (!got_any_mos_pvs)
+		return;
+
+	parse_call_stats_1(&global_mos_stats, dict, msg);
+	parse_call_stats_1(&side_A_mos_stats, dict, msg);
+	parse_call_stats_1(&side_B_mos_stats, dict, msg);
+}
+
 static int rtpengine_delete(struct sip_msg *msg, const char *flags) {
-	return rtpp_function_call_simple(msg, OP_DELETE, flags);
+	bencode_buffer_t bencbuf;
+	bencode_item_t *ret = rtpp_function_call_ok(&bencbuf, msg, OP_DELETE, flags, NULL);
+	if (!ret)
+		return -1;
+	parse_call_stats(ret, msg);
+	bencode_buffer_free(&bencbuf);
+	return 1;
+}
+
+static int rtpengine_query(struct sip_msg *msg, const char *flags) {
+	bencode_buffer_t bencbuf;
+	bencode_item_t *ret = rtpp_function_call_ok(&bencbuf, msg, OP_QUERY, flags, NULL);
+	if (!ret)
+		return -1;
+	parse_call_stats(ret, msg);
+	bencode_buffer_free(&bencbuf);
+	return 1;
 }
 
 static int rtpengine_rtpp_set_wrap(struct sip_msg *msg, int (*func)(struct sip_msg *msg, void *, int),
@@ -2678,6 +3062,23 @@ rtpengine_delete1_f(struct sip_msg* msg, char* str1, char* str2)
 	return rtpengine_rtpp_set_wrap(msg, rtpengine_delete_wrap, flags.s, 1);
 }
 
+static int rtpengine_query_wrap(struct sip_msg *msg, void *d, int more) {
+	return rtpengine_query(msg, d);
+}
+
+static int
+rtpengine_query1_f(struct sip_msg* msg, char* str1, char* str2)
+{
+	str flags;
+
+	flags.s = NULL;
+	if (str1)
+		get_str_fparam(&flags, msg, (fparam_t *) str1);
+
+	return rtpengine_rtpp_set_wrap(msg, rtpengine_query_wrap, flags.s, 1);
+}
+
+
 /* This function assumes p points to a line of requested type. */
 
 static int

Daži faili netika attēloti, jo izmaiņu fails ir pārāk liels