Browse Source

modules/tm: added outbound support to t_load_contact()/t_next_contacts()

- added new function t_next_contact_flows()
- readme not updated yet
Juha Heinanen 12 years ago
parent
commit
74a9baf6fd
3 changed files with 661 additions and 302 deletions
  1. 646 298
      modules/tm/t_serial.c
  2. 2 0
      modules/tm/t_serial.h
  3. 13 4
      modules/tm/tm.c

+ 646 - 298
modules/tm/t_serial.c

@@ -48,12 +48,18 @@ struct contact {
     qvalue_t q;
     str dst_uri;
     str path;
-    unsigned int flags;
     struct socket_info* sock;
+    str instance;
+    unsigned int flags;
     unsigned short q_flag;
     struct contact *next;
 };
 
+struct instance_list {
+    str instance;
+    struct instance_list *next;
+};
+
 /* 
  * Frees contact list used by load_contacts function
  */
@@ -66,151 +72,74 @@ static inline void free_contact_list(struct contact *curr) {
     }
 }
 
-/* Encode branch info from contact struct to str */
-static inline int encode_branch_info(str *info, struct contact *con)
-{
-    char *at, *s;
-    int len;
-
-    info->len = con->uri.len + con->dst_uri.len +
-		con->path.len + MAX_SOCKET_STR + INT2STR_MAX_LEN + 5;
-    info->s = pkg_malloc(info->len);
-    if (!info->s) {
-		LM_ERR("no memory left for branch info\n");
-		return 0;
-    }
-    at = info->s;
-    append_str(at, con->uri.s, con->uri.len);
-    append_chr(at, '\n');
-    append_str(at, con->dst_uri.s, con->dst_uri.len);
-    append_chr(at, '\n');
-    append_str(at, con->path.s, con->path.len);
-    append_chr(at, '\n');
-    if (con->sock) {
-		len = MAX_SOCKET_STR;
-		if (socket2str(at, &len, con->sock) < 0) {
-			LM_ERR("failed to convert socket to str\n");
-			return 0;
-		}
-    } else {
-		len = 0;
+/* 
+ * Frees instance list used by next_contacts function
+ */
+static inline void free_instance_list(struct instance_list *curr) {
+    struct instance_list *prev;
+    while (curr) {
+	pkg_free(curr->instance.s);
+	prev = curr;
+	curr = curr->next;
+	pkg_free(prev);
     }
-    at = at + len;
-    append_chr(at, '\n');
-    s = int2str(con->flags, &len);
-    append_str(at, s, len);
-    append_chr(at, '\n');
-    info->len = at - info->s + 1;
-
-    return 1;
 }
 
+static str uri_name = {"uri", 3};
+static str dst_uri_name = {"dst_uri", 7};
+static str path_name = {"path", 4};
+static str sock_name = {"sock", 4};
+static str instance_name = {"instance", 8};
+static str flags_name = {"flags", 5};
+static str q_flag_name = {"q_flag", 6};
 
-/* Encode branch info from str */
-static inline int decode_branch_info(char *info, str *uri, str *dst, str *path,
-									 struct socket_info **sock,
-									 unsigned int *flags)
+void add_contacts_avp(str *uri, str *dst_uri, str *path, str *sock_str,
+		      unsigned int flags, unsigned int q_flag, str *instance)
 {
-    str s, host;
-    int port, proto;
-    char *pos, *at, *tmp;
+    sr_xavp_t *record;
+    sr_xval_t val;
 
-	if (info == NULL) {
-		ERR("decode_branch_info: Invalid input string.\n");
-		return 0;
-	}
-	
-	/* Reset or return arguments to sane defaults */
-	uri->s = 0; uri->len = 0;
-	dst->s = 0; dst->len = 0;
-	path->s = 0; path->len = 0;
-	*sock = NULL;
-	*flags = 0;
-	
-	/* Make sure that we have at least a non-empty URI string, it is fine if
-	 * everything else is missing, but we need at least the URI. */
-	uri->s = info; 
-	if ((pos = strchr(info, '\n')) == NULL) { 
-		uri->len = strlen(info); 
-			/* We don't even have the URI string, this is bad, report an
-			 * error. */
-		if (uri->len == 0) goto uri_missing;
-		return 1;
-    }
-	uri->len = pos - info;
-	if (uri->len == 0) goto uri_missing;
-
-	/* If we get here we have at least the branch URI, now try to parse as
-	 * much as you can. All output variable have been initialized above, so it
-	 * is OK if any of the fields are missing from now on. */
-    dst->s = at = pos + 1;
-    if ((pos = strchr(at, '\n')) == NULL) {
-		dst->len = strlen(dst->s);
-		return 1;
-    }
-	dst->len = pos - at;
-
-    path->s = at = pos + 1;
-    if ((pos = strchr(at, '\n')) == NULL) {
-		path->len = strlen(path->s);
-		return 1;
-    }
-    path->len = pos - at;
-
-    s.s = at = pos + 1;
-    if ((pos = strchr(at, '\n')) == NULL) {
-		/* No LF found, that means we take the string till the final zero
-		 * termination character and pass it directly to parse_phostport
-		 * without making a zero-terminated copy. */
-		tmp = s.s;
-		s.len = strlen(s.s);
-	} else {
-		/* Our string is terminated by LF, so we need to make a
-		 * zero-terminated copy of the string before we pass it to
-		 * parse_phostport. */
-		s.len = pos - at;
-		if ((tmp = as_asciiz(&s)) == NULL) {
-			ERR("No memory left\n");
-			return 0;
-		}
-	}	
-	if (s.len) {
-		if (parse_phostport(tmp, &host.s, &host.len,
-							&port, &proto) != 0) {
-			LM_ERR("parsing of socket info <%s> failed\n", tmp);
-			if (pos) pkg_free(tmp);
-			return 0;
-		}
+    record = NULL;
 
-		*sock = grep_sock_info(&host, (unsigned short)port,
-							   (unsigned short)proto);
-		if (*sock == 0) {
-			LM_ERR("invalid socket <%s>\n", tmp);
-			if (pos) pkg_free(tmp);
-			return 0;
-		}
-	}
-	
-	if (pos) pkg_free(tmp);
-	else return 1;
-	
-    s.s = at = pos + 1;
-    if ((pos = strchr(at, '\n')) == NULL) s.len = strlen(s.s);
-    else s.len = pos - s.s;
-
-    if (s.len) {
-		if (str2int(&s, flags) != 0) {
-			LM_ERR("failed to decode flags <%.*s>\n", STR_FMT(&s));
-			return 0;
-		}
+    val.type = SR_XTYPE_STR;
+    val.v.s = *uri;
+    xavp_add_value(&uri_name, &val, &record);
+
+    if (dst_uri->len > 0) {
+	val.type = SR_XTYPE_STR;
+	val.v.s = *dst_uri;
+	xavp_add_value(&dst_uri_name, &val, &record);
     }
-    return 1;
 
-uri_missing:
-	ERR("decode_branch_info: Cannot decode branch URI.\n");
-	return 0;
-}
+    if (path->len > 0) {
+	val.type = SR_XTYPE_STR;
+	val.v.s = *path;
+	xavp_add_value(&path_name, &val, &record);
+    }
+
+    if (sock_str->len > 0) {
+	val.v.s = *sock_str;
+	xavp_add_value(&sock_name, &val, &record);
+    }
+
+    val.type = SR_XTYPE_INT;
+    val.v.i = flags;
+    xavp_add_value(&flags_name, &val, &record);
+
+    val.type = SR_XTYPE_INT;
+    val.v.i = q_flag;
+    xavp_add_value(&q_flag_name, &val, &record);
+
+    if (instance->len > 0) {
+	val.type = SR_XTYPE_STR;
+	val.v.s = *instance;
+	xavp_add_value(&instance_name, &val, &record);
+    }
 
+    val.type = SR_XTYPE_XAVP;
+    val.v.xavp = record;
+    xavp_add_value(&contacts_avp, &val, NULL);
+}
 
 /* 
  * Loads contacts in destination set into contacts_avp in reverse
@@ -220,149 +149,143 @@ uri_missing:
  */
 int t_load_contacts(struct sip_msg* msg, char* key, char* value)
 {
-    str uri, tmp, dst_uri, path, branch_info, *ruri;
-    qvalue_t first_q, q;
+    branch_t *branch;
+    str *ruri, sock_str;
     struct contact *contacts, *next, *prev, *curr;
-    int_str val;
-    int first_idx, idx;
-    struct socket_info* sock;
-    unsigned int flags;
+    int first_idx, idx, len;
+    char sock_buf[MAX_SOCKET_STR];
 
     /* Check if contacts_avp has been defined */
-    if (contacts_avp.n == 0) {
-		LM_ERR("feature has been disabled - "
-		       "to enable define contacts_avp module parameter");
-		return -1;
+    if (contacts_avp.len == 0) {
+	LM_ERR("feature has been disabled - "
+	       "to enable define contacts_avp module parameter");
+	return -1;
     }
 
     /* Check if anything needs to be done */
-    if (nr_branches == 0) {
-		LM_DBG("t_load_contacts(): nothing to do - no branches!\n");
-		return 1;
-    }
-
-    ruri = (str *)0;
-	if (ruri_is_new) {
-		/* Take first q from Request-URI */
-		ruri = GET_RURI(msg);
-		if (!ruri) {
-			LM_ERR("no Request-URI found\n");
-			return -1;
-		}
-		first_q = get_ruri_q();
-		first_idx = 0;
-	} else {
-		/* Take first q from first branch */
-		uri.s = get_branch(0, &uri.len, &first_q, &dst_uri, &path, &flags,
-			               &sock);
-		first_idx = 1;
-    }
+    LM_DBG("nr_branches is %d\n", nr_branches);
 
-    /* Check if all q values are equal */
-    for(idx = first_idx; (tmp.s = get_branch(idx, &tmp.len, &q, 0, 0, 0, 0))
-			!= 0; idx++) {
-		if (q != first_q) {
-			goto rest;
-		}
+    if ((nr_branches == 0) || ((nr_branches == 1) && !ruri_is_new)) {
+	LM_DBG("nothing to do - only one contact!\n");
+	return 1;
     }
 
-    LM_DBG("t_load_contacts(): nothing to do - all contacts have same q!\n");
-    return 1;
-
-rest:
-
     /* Allocate memory for first contact */
     contacts = (struct contact *)pkg_malloc(sizeof(struct contact));
     if (!contacts) {
-		LM_ERR("no memory for contact info\n");
-		return -1;
+	LM_ERR("no memory for contact info\n");
+	return -1;
     }
 
-	if (ruri_is_new) {
-		/* Insert Request-URI branch to first contact */
-		contacts->uri.s = ruri->s;
-		contacts->uri.len = ruri->len;
-		contacts->dst_uri = msg->dst_uri;
-		contacts->sock = msg->force_send_socket;
-		getbflagsval(0, &contacts->flags);
-		contacts->path = msg->path_vec;
-	} else {
-		/* Insert first branch to first contact */
-		contacts->uri = uri;
-		contacts->dst_uri = dst_uri;
-		contacts->sock = sock;
-		contacts->flags = flags;
-		contacts->path = path;
+    if (ruri_is_new) {
+	ruri = GET_RURI(msg);
+	if (!ruri) {
+	    LM_ERR("no Request-URI found\n");
+	    return -1;
+	}	
+	/* Insert Request-URI branch to first contact */
+	contacts->uri.s = ruri->s;
+	contacts->uri.len = ruri->len;
+	contacts->dst_uri = msg->dst_uri;
+	contacts->sock = msg->force_send_socket;
+	getbflagsval(0, &contacts->flags);
+	contacts->path = msg->path_vec;
+	contacts->q = get_ruri_q();
+	contacts->instance = msg->instance;
+	first_idx = 0;
+    } else {
+	/* Insert first branch to first contact */
+	branch = get_sip_branch(0);
+	contacts->uri.s = branch->uri;
+	contacts->uri.len = branch->len;
+	contacts->dst_uri.s = branch->dst_uri;
+	contacts->dst_uri.len = branch->dst_uri_len;
+	contacts->sock = branch->force_send_socket;
+	contacts->flags = branch->flags;
+	contacts->path.s = branch->path;
+	contacts->path.len = branch->path_len;
+	contacts->q = branch->q;
+	contacts->instance.s = branch->instance;
+	contacts->instance.len = branch->instance_len;
+	first_idx = 1;
     }
-	contacts->q = first_q;
-	contacts->next = (struct contact *)0;
+
+    contacts->next = (struct contact *)0;
 
     /* Insert (remaining) branches to contact list in increasing q order */
+    for (idx = first_idx; (branch = get_sip_branch(idx)) != 0; idx++) {
 
-    for(idx = first_idx;
-		(uri.s = get_branch(idx,&uri.len,&q,&dst_uri,&path,&flags,&sock))
-			!= 0;
-		idx++ ) {
-		next = (struct contact *)pkg_malloc(sizeof(struct contact));
-		if (!next) {
-			LM_ERR("no memory for contact info\n");
-			free_contact_list(contacts);
-			return -1;
-		}
-		next->uri = uri;
-		next->q = q;
-		next->dst_uri = dst_uri;
-		next->path = path;
-		next->flags = flags;
-		next->sock = sock;
-		next->next = (struct contact *)0;
-		prev = (struct contact *)0;
-		curr = contacts;
-		while (curr && (curr->q < q)) {
-			prev = curr;
-			curr = curr->next;
-		}
-		if (!curr) {
-			next->next = (struct contact *)0;
-			prev->next = next;
-		} else {
-			next->next = curr;
-			if (prev) {
-				prev->next = next;
-			} else {
-				contacts = next;
-			}
-		}    
+	next = (struct contact *)pkg_malloc(sizeof(struct contact));
+	if (!next) {
+	    LM_ERR("no memory for contact info\n");
+	    free_contact_list(contacts);
+	    return -1;
+	}
+
+	next->uri.s = branch->uri;
+	next->uri.len = branch->len;
+	next->dst_uri.s = branch->dst_uri;
+	next->dst_uri.len = branch->dst_uri_len;
+	next->sock = branch->force_send_socket;
+	next->flags = branch->flags;
+	next->path.s = branch->path;
+	next->path.len = branch->path_len;
+	next->q = branch->q;
+	next->instance.s = branch->instance;
+	next->instance.len = branch->instance_len;
+	next->next = (struct contact *)0;
+
+	prev = (struct contact *)0;
+	curr = contacts;
+	while (curr && (curr->q < branch->q)) {
+	    prev = curr;
+	    curr = curr->next;
+	}
+	if (!curr) {
+	    next->next = (struct contact *)0;
+	    prev->next = next;
+	} else {
+	    next->next = curr;
+	    if (prev) {
+		prev->next = next;
+	    } else {
+		contacts = next;
+	    }
+	}    
     }
 
     /* Assign values for q_flags */
     curr = contacts;
     curr->q_flag = 0;
     while (curr->next) {
-		if (curr->q < curr->next->q) {
-			curr->next->q_flag = Q_FLAG;
-		} else {
-			curr->next->q_flag = 0;
-		}
-		curr = curr->next;
+	if (curr->q < curr->next->q) {
+	    curr->next->q_flag = Q_FLAG;
+	} else {
+	    curr->next->q_flag = 0;
+	}
+	curr = curr->next;
     }
 
     /* Add contacts to contacts_avp */
     curr = contacts;
     while (curr) {
-		if (encode_branch_info(&branch_info, curr) == 0) {
-			LM_ERR("encoding of branch info failed\n");
-			free_contact_list(contacts);
-			if (branch_info.s) pkg_free(branch_info.s);
-			return -1;
-		}
-		val.s = branch_info;
-		add_avp(contacts_avp_type|AVP_VAL_STR|(curr->q_flag),
-				contacts_avp, val);
-		pkg_free(branch_info.s);
-		LM_DBG("loaded contact <%.*s> with q_flag <%d>\n",
-		       STR_FMT(&val.s), curr->q_flag);
-		curr = curr->next;
+
+	if (curr->sock) {
+	    len = MAX_SOCKET_STR - 1;
+	    if (socket2str(sock_buf, &len, curr->sock) < 0) {
+		LM_ERR("failed to convert socket to str\n");
+		return -1;
+	    }
+	    sock_buf[len] = 0;
+	    sock_str.s = sock_buf;
+	    sock_str.len = len + 1;
+	}
+
+	add_contacts_avp(&(curr->uri), &(curr->dst_uri), &(curr->path),
+			 &sock_str, curr->flags, curr->q_flag,
+			 &(curr->instance));
+
+	curr = curr->next;
     }
 
     /* Clear all branches */
@@ -374,90 +297,515 @@ rest:
     return 1;
 }
 
+void add_contact_flows_avp(str *uri, str *dst_uri, str *path, str *sock_str,
+			   unsigned int flags, str *instance)
+{
+    sr_xavp_t *record;
+    sr_xval_t val;
+
+    record = NULL;
+
+    val.type = SR_XTYPE_STR;
+    val.v.s = *uri;
+    xavp_add_value(&uri_name, &val, &record);
+
+    if (dst_uri->len > 0) {
+	val.type = SR_XTYPE_STR;
+	val.v.s = *dst_uri;
+	xavp_add_value(&dst_uri_name, &val, &record);
+    }
+
+    if (path->len > 0) {
+	val.type = SR_XTYPE_STR;
+	val.v.s = *path;
+	xavp_add_value(&path_name, &val, &record);
+    }
+
+    if (sock_str->len > 0) {
+	val.v.s = *sock_str;
+	xavp_add_value(&sock_name, &val, &record);
+    }
+
+    if (instance->len > 0) {
+	val.type = SR_XTYPE_STR;
+	val.v.s = *instance;
+	xavp_add_value(&instance_name, &val, &record);
+    }
+
+    val.type = SR_XTYPE_INT;
+    val.v.i = flags;
+    xavp_add_value(&flags_name, &val, &record);
+
+    val.type = SR_XTYPE_XAVP;
+    val.v.xavp = record;
+    xavp_add_value(&contact_flows_avp, &val, NULL);
+}
 
 /*
- * Adds to request a new destination set that includes all highest
- * priority class contacts in contacts_avp.   Request URI is rewritten with
- * first contact and the remaining contacts (if any) are added as branches.
- * Removes used contacts from contacts_avp.  Returns 1, if contacts_avp
- * was not empty and a destination set was successfully added.  Returns -2,
- * if contacts_avp was empty and thus there was nothing to do.
- * Returns -1 in case of an error. */
+ * Adds to request a new destination set that includes highest
+ * priority class contacts in contacts_avp, but only one contact with same
+ * +sip.instance value is included.  Others are added to contact_flows_avp
+ * for later consumption by next_contact_flows().
+ * Request URI is rewritten with first contact and the remaining contacts
+ * (if any) are added as branches. Removes all highest priority contacts
+ * from contacts_avp.
+ * Returns 1, if contacts_avp was not empty and a destination set was
+ * successfully added.  Returns -2, if contacts_avp was empty and thus
+ * there was nothing to do. Returns -1 in case of an error. */
 int t_next_contacts(struct sip_msg* msg, char* key, char* value)
 {
-    struct usr_avp *avp, *prev;
-    int_str val;
-    str uri, dst, path;
+    str uri, dst_uri, path, instance, host, sock_str;
     struct socket_info *sock;
-    unsigned int flags;
-    struct search_state st;
+    unsigned int flags, q_flag;
+    sr_xavp_t *xavp_list, *xavp, *prev_xavp, *vavp;
+    int port, proto;
+    struct instance_list *il, *ilp;
 
     /* Check if contacts_avp has been defined */
-    if (contacts_avp.n == 0) {
-		LM_ERR("feature has been disabled - "
-			   "to enable define contacts_avp module parameter");
-		return -1;
+    if (contacts_avp.len == 0) {
+	LM_ERR("feature has been disabled - "
+	       "to enable define contacts_avp module parameter");
+	return -1;
     }
 
     /* Load Request-URI and branches */
 
     /* Find first contacts_avp value */
-    avp = search_first_avp(contacts_avp_type, contacts_avp, &val, &st);
-    if (!avp) {
-	LM_DBG("no AVPs - we are done!\n");
+    xavp_list = xavp_get(&contacts_avp, NULL);
+    if (!xavp_list) {
+	LM_DBG("no contacts in contacts_avp - we are done!\n");
 	return -2;
     }
 
-    LM_DBG("next contact is <%.*s>\n", STR_FMT(&val.s));
+    xavp = xavp_list;
 
-    if (decode_branch_info(val.s.s, &uri, &dst, &path, &sock, &flags)
-	== 0) {
-	LM_ERR("decoding of branch info <%.*s> failed\n", STR_FMT(&val.s));
-	destroy_avp(avp);
-	return -1;
+    vavp = xavp_get(&uri_name, xavp->val.v.xavp);
+    uri = vavp->val.v.s;
+
+    vavp = xavp_get(&dst_uri_name, xavp->val.v.xavp);
+    if (vavp != NULL) {
+	dst_uri = vavp->val.v.s;
+    } else {
+	dst_uri.s = 0;
+	dst_uri.len = 0;
+    }
+
+    vavp = xavp_get(&path_name, xavp->val.v.xavp);
+    if (vavp != NULL) {
+	path = vavp->val.v.s;
+    } else {
+	path.s = 0;
+	path.len = 0;
+    }
+
+    vavp = xavp_get(&sock_name, xavp->val.v.xavp);
+    if (vavp != NULL) {
+	sock_str.s = vavp->val.v.s.s;
+	if (parse_phostport(sock_str.s, &host.s, &host.len, &port, &proto)
+	    != 0) {
+	    LM_ERR("parsing of socket info <%s> failed\n", sock_str.s);
+	    xavp_destroy_list(&xavp_list);
+	    return -1;
+	}
+	sock = grep_sock_info(&host, (unsigned short)port,
+			      (unsigned short)proto);
+	if (sock == 0) {
+	    xavp_destroy_list(&xavp_list);
+	    return -1;
+	}
+    } else {
+	sock = NULL;
+    }
+
+    vavp = xavp_get(&flags_name, xavp->val.v.xavp);
+    flags = vavp->val.v.i;
+
+    vavp = xavp_get(&q_flag_name, xavp->val.v.xavp);
+    q_flag = vavp->val.v.i;
+
+    vavp = xavp_get(&instance_name, xavp->val.v.xavp);
+    il = (struct instance_list *)0;
+    if ((vavp != NULL) && !q_flag) {
+	instance = vavp->val.v.s;
+	il = (struct instance_list *)pkg_malloc(sizeof(struct instance_list));
+	if (!il) {
+	    LM_ERR("no memory for instance list entry\n");
+	    return -1;
+	}
+	il->instance.s = pkg_malloc(instance.len);
+	if (!il->instance.s) {
+	    pkg_free(il);
+	    LM_ERR("no memory for instance list instance\n");
+	    return -1;
+	}
+	il->instance.len = instance.len;
+	memcpy(il->instance.s, instance.s, instance.len);
+	il->next = (struct instance_list *)0;
     }
-    
+
     /* Rewrite Request-URI */
     rewrite_uri(msg, &uri);
-    if (dst.s && dst.len) set_dst_uri(msg, &dst);
-    else reset_dst_uri(msg);
-    if (path.s && path.len) set_path_vector(msg, &path);
-    else reset_path_vector(msg);
+
+    if (dst_uri.len) {
+	set_dst_uri(msg, &dst_uri);
+    } else {
+	reset_dst_uri(msg);
+    }
+
+    if (path.len) {
+	set_path_vector(msg, &path);
+    } else {
+	reset_path_vector(msg);
+    }
+
     set_force_socket(msg, sock);
+
     setbflagsval(0, flags);
 
-    if (avp->flags & Q_FLAG) {
-	destroy_avp(avp);
+    /* Check if there was only one contact at this priority */
+    if (q_flag) {
+	xavp_rm(xavp, NULL);
 	return 1;
     }
 		
     /* Append branches until out of branches or Q_FLAG is set */
-    prev = avp;
-    while ((avp = search_next_avp(&st, &val))) {
-	destroy_avp(prev);
-	LM_DBG("next contact is <%.*s>\n", STR_FMT(&val.s));
-	
-	if (decode_branch_info(val.s.s, &uri, &dst, &path, &sock, &flags)
-	    == 0) {
-	    LM_ERR("decoding of branch info <%.*s> failed\n", STR_FMT(&val.s));
-	    destroy_avp(avp);
-	    return -1;
+    /* If a branch has same instance value as some previous branch, */
+    /* instead of appending it, add it to contact_flows_avp */
+
+    xavp_rm_by_name(&contact_flows_avp, 1, NULL);
+    prev_xavp = xavp;
+
+    while ((xavp = xavp_get_next(prev_xavp)) != NULL) {
+
+	xavp_rm(prev_xavp, NULL);
+
+	vavp = xavp_get(&q_flag_name, xavp->val.v.xavp);
+	q_flag = vavp->val.v.i;
+
+	vavp = xavp_get(&uri_name, xavp->val.v.xavp);
+	uri = vavp->val.v.s;
+
+	vavp = xavp_get(&dst_uri_name, xavp->val.v.xavp);
+	if (vavp != NULL) {
+	    dst_uri = vavp->val.v.s;
+	} else {
+	    dst_uri.len = 0;
+	}
+
+	vavp = xavp_get(&path_name, xavp->val.v.xavp);
+	if (vavp != NULL) {
+	    path = vavp->val.v.s;
+	} else {
+	    path.len = 0;
+	}
+
+	vavp = xavp_get(&sock_name, xavp->val.v.xavp);
+	if (vavp != NULL) {
+	    sock_str = vavp->val.v.s;
+	    if (parse_phostport(sock_str.s, &host.s, &host.len, &port, &proto)
+		!= 0) {
+		LM_ERR("parsing of socket info <%s> failed\n", sock_str.s);
+		free_instance_list(il);
+		xavp_destroy_list(&xavp_list);
+		return -1;
+	    }
+	    sock = grep_sock_info(&host, (unsigned short)port,
+				  (unsigned short)proto);
+	    if (sock == 0) {
+		free_instance_list(il);
+		xavp_destroy_list(&xavp_list);
+		return -1;
+	    }
+	} else {
+	    sock = NULL;
+	}
+
+	vavp = xavp_get(&flags_name, xavp->val.v.xavp);
+	flags = vavp->val.v.i;
+
+	vavp = xavp_get(&instance_name, xavp->val.v.xavp);
+	if (vavp != NULL) {
+	    instance = vavp->val.v.s;
+	    ilp = il;
+	    while (ilp) {
+		if ((instance.len == ilp->instance.len) &&
+		    (strncmp(instance.s, ilp->instance.s, instance.len) == 0))
+		    break;
+		ilp = ilp->next;
+	    }
+	    if (ilp) {
+		add_contact_flows_avp(&uri, &dst_uri, &path, &sock_str,
+				      flags, &instance);
+		goto check_q_flag;
+	    }
+	    if (!q_flag) {
+		ilp = (struct instance_list *)
+		    pkg_malloc(sizeof(struct instance_list));
+		if (!ilp) {
+		    LM_ERR("no memory for instance list element\n");
+		    free_instance_list(il);
+		    return -1;
+		}
+		ilp->instance.s = pkg_malloc(instance.len);
+		if (!ilp->instance.s) {
+		    LM_ERR("no memory for instance list instance\n");
+		    pkg_free(ilp);
+		    free_instance_list(il);
+		    return -1;
+		}
+		ilp->instance.len = instance.len;
+		memcpy(ilp->instance.s, instance.s, instance.len);
+		ilp->next = il;
+		il = ilp;
+	    }
 	}
 
-	if (append_branch(msg, &uri, &dst, &path, 0, flags, sock, 0, 0) != 1) {
+	if (append_branch(msg, &uri, &dst_uri, &path, 0, flags, sock, 0, 0)
+	    != 1) {
 	    LM_ERR("appending branch failed\n");
-	    destroy_avp(avp);
+	    free_instance_list(il);
+	    xavp_destroy_list(&xavp_list);
 	    return -1;
 	}
 
-	if (avp->flags & Q_FLAG) {
-	    destroy_avp(avp);
+    check_q_flag:
+	if (q_flag) {
+	    free_instance_list(il);
+	    xavp_rm(xavp, NULL);
 	    return 1;
 	}
-	prev = avp;
+
+	prev_xavp = xavp;
+    }
+
+    free_instance_list(il);
+    xavp_rm(prev_xavp, NULL);
+
+    return 1;
+}
+
+/*
+ * Adds to request a new destination set that includes contacts
+ * from contact_flows_avp.  Only one contact with same +sip.instance
+ * value is included.
+ * Request URI is rewritten with first contact and the remaining contacts
+ * (if any) are added as branches. Removes all used contacts
+ * from contacts_avp.
+ * Returns 1, if contact_flows_avp was not empty and a destination set was
+ * successfully added.  Returns -2, if contact_flows_avp was empty and thus
+ * there was nothing to do. Returns -1 in case of an error. */
+int t_next_contact_flows(struct sip_msg* msg, char* key, char* value)
+{
+    str uri, dst_uri, path, instance, host;
+    struct socket_info *sock;
+    unsigned int flags;
+    sr_xavp_t *xavp_list, *xavp, *next_xavp, *vavp;
+    char *tmp;
+    int port, proto;
+    struct instance_list *il, *ilp;
+
+    /* Check if contact_flows_avp has been defined */
+    if (contact_flows_avp.len == 0) {
+	LM_ERR("feature has been disabled - "
+	       "to enable define contact_flows_avp module parameter");
+	return -1;
+    }
+
+    /* Load Request-URI and branches */
+
+    /* Find first contact_flows_avp value */
+    xavp_list = xavp_get(&contact_flows_avp, NULL);
+    if (!xavp_list) {
+	LM_DBG("no contacts in contact_flows_avp - we are done!\n");
+	return -2;
+    }
+
+    xavp = xavp_list;
+    next_xavp = xavp_get_next(xavp);
+
+    vavp = xavp_get(&uri_name, xavp->val.v.xavp);
+    uri = vavp->val.v.s;
+
+    vavp = xavp_get(&dst_uri_name, xavp->val.v.xavp);
+    if (vavp != NULL) {
+	dst_uri = vavp->val.v.s;
+    } else {
+	dst_uri.s = 0;
+	dst_uri.len = 0;
+    }
+
+    vavp = xavp_get(&path_name, xavp->val.v.xavp);
+    if (vavp != NULL) {
+	path = vavp->val.v.s;
+    } else {
+	path.s = 0;
+	path.len = 0;
+    }
+
+    vavp = xavp_get(&sock_name, xavp->val.v.xavp);
+    if (vavp != NULL) {
+	tmp = vavp->val.v.s.s;
+	if (parse_phostport(tmp, &host.s, &host.len, &port, &proto) != 0) {
+	    LM_ERR("parsing of socket info <%s> failed\n", tmp);
+	    xavp_destroy_list(&xavp_list);
+	    return -1;
+	}
+	sock = grep_sock_info(&host, (unsigned short)port,
+			      (unsigned short)proto);
+	if (sock == 0) {
+	    xavp_destroy_list(&xavp_list);
+	    return -1;
+	}
+    } else {
+	sock = NULL;
+    }
+
+    vavp = xavp_get(&flags_name, xavp->val.v.xavp);
+    flags = vavp->val.v.i;
+
+    vavp = xavp_get(&instance_name, xavp->val.v.xavp);
+    il = (struct instance_list *)0;
+    if ((vavp != NULL) && next_xavp) {
+	instance = vavp->val.v.s;
+	il = (struct instance_list *)pkg_malloc(sizeof(struct instance_list));
+	if (!il) {
+	    LM_ERR("no memory for instance list entry\n");
+	    return -1;
+	}
+	il->instance.s = pkg_malloc(instance.len);
+	if (!il->instance.s) {
+	    pkg_free(il);
+	    LM_ERR("no memory for instance list instance\n");
+	    return -1;
+	}
+	il->instance.len = instance.len;
+	memcpy(il->instance.s, instance.s, instance.len);
+	il->next = (struct instance_list *)0;
+    }
+
+    /* Rewrite Request-URI */
+    rewrite_uri(msg, &uri);
+
+    if (dst_uri.len) {
+	set_dst_uri(msg, &dst_uri);
+    } else {
+	reset_dst_uri(msg);
+    }
+
+    if (path.len) {
+	set_path_vector(msg, &path);
+    } else {
+	reset_path_vector(msg);
+    }
+
+    set_force_socket(msg, sock);
+
+    setbflagsval(0, flags);
+
+    /* Append branches until out of branches. */
+    /* Do not include a branch that has same instance value as some */
+    /* previous branch. */
+
+    xavp_rm(xavp, NULL);
+    xavp = next_xavp;
+
+    while (xavp) {
+	
+	next_xavp = xavp_get_next(xavp);
+
+	vavp = xavp_get(&instance_name, xavp->val.v.xavp);
+	if (vavp != NULL) {
+	    instance = vavp->val.v.s;
+	    ilp = il;
+	    while (ilp) {
+		if ((instance.len == ilp->instance.len) &&
+		    (strncmp(instance.s, ilp->instance.s, instance.len) == 0))
+		    break;
+		ilp = ilp->next;
+	    }
+	    if (ilp) {
+		/* skip already appended instance */
+		xavp = next_xavp;
+		continue;
+	    }
+	    if (next_xavp) {
+		ilp = (struct instance_list *)
+		pkg_malloc(sizeof(struct instance_list));
+		if (!ilp) {
+		    LM_ERR("no memory for new instance list entry\n");
+		    free_instance_list(il);
+		    return -1;
+		}
+		ilp->instance.s = pkg_malloc(instance.len);
+		if (!ilp->instance.s) {
+		    pkg_free(il);
+		    LM_ERR("no memory for instance list instance\n");
+		    return -1;
+		}
+		ilp->instance.len = instance.len;
+		memcpy(ilp->instance.s, instance.s, instance.len);
+		ilp->next = il;
+		il = ilp;
+	    } else {
+		LM_ERR("instance missing from contact_flow_avp contact\n");
+		free_instance_list(il);
+		return -1;
+	    }
+	}
+
+	vavp = xavp_get(&uri_name, xavp->val.v.xavp);
+	uri = vavp->val.v.s;
+
+	vavp = xavp_get(&dst_uri_name, xavp->val.v.xavp);
+	if (vavp != NULL) {
+	    dst_uri = vavp->val.v.s;
+	} else {
+	    dst_uri.len = 0;
+	}
+
+	vavp = xavp_get(&path_name, xavp->val.v.xavp);
+	if (vavp != NULL) {
+	    path = vavp->val.v.s;
+	} else {
+	    path.len = 0;
+	}
+
+	vavp = xavp_get(&sock_name, xavp->val.v.xavp);
+	if (vavp != NULL) {
+	    tmp = vavp->val.v.s.s;
+	    if (parse_phostport(tmp, &host.s, &host.len, &port, &proto) != 0) {
+		LM_ERR("parsing of socket info <%s> failed\n", tmp);
+		free_instance_list(il);
+		xavp_destroy_list(&xavp_list);
+		return -1;
+	    }
+	    sock = grep_sock_info(&host, (unsigned short)port,
+				  (unsigned short)proto);
+	    if (sock == 0) {
+		free_instance_list(il);
+		xavp_destroy_list(&xavp_list);
+		return -1;
+	    }
+	} else {
+	    sock = NULL;
+	}
+
+	vavp = xavp_get(&flags_name, xavp->val.v.xavp);
+	flags = vavp->val.v.i;
+
+	if (append_branch(msg, &uri, &dst_uri, &path, 0, flags, sock, 0, 0)
+	    != 1) {
+	    LM_ERR("appending branch failed\n");
+	    free_instance_list(il);
+	    xavp_destroy_list(&xavp_list);
+	    return -1;
+	}
+
+	xavp_rm(xavp, NULL);
+	xavp = next_xavp;
     }
 
-    destroy_avp(prev);
+    free_instance_list(il);
 
     return 1;
 }

+ 2 - 0
modules/tm/t_serial.h

@@ -33,3 +33,5 @@ extern int fr_inv_timer_next;
 int t_load_contacts(struct sip_msg* msg, char* key, char* value);
 
 int t_next_contacts(struct sip_msg* msg, char* key, char* value);
+
+int t_next_contact_flows(struct sip_msg* msg, char* key, char* value);

+ 13 - 4
modules/tm/tm.c

@@ -306,7 +306,9 @@ static int w_t_is_set(struct sip_msg* msg, char* target, char* bar);
  * searched for nothing each time a new transaction is created */
 static char *fr_timer_param = 0 /*FR_TIMER_AVP*/;
 static char *fr_inv_timer_param = 0 /*FR_INV_TIMER_AVP*/;
-static char *contacts_avp_param = 0;
+
+str contacts_avp = {0, 0};
+str contact_flows_avp = {0, 0};
 
 int tm_remap_503_500 = 1;
 
@@ -473,6 +475,8 @@ static cmd_export_t cmds[]={
 			REQUEST_ROUTE | FAILURE_ROUTE},
 	{"t_next_contacts", t_next_contacts,            0, 0,
 			REQUEST_ROUTE | FAILURE_ROUTE},
+	{"t_next_contact_flows", t_next_contact_flows,            0, 0,
+			REQUEST_ROUTE | FAILURE_ROUTE},
 
 	/* not applicable from the script */
 	{"load_tm",            (cmd_function)load_tm,           NO_SCRIPT,   0, 0},
@@ -519,7 +523,8 @@ static param_export_t params[]={
 	{"cancel_b_method",     PARAM_INT, &default_tm_cfg.cancel_b_flags},
 	{"reparse_on_dns_failover", PARAM_INT, &default_tm_cfg.reparse_on_dns_failover},
 	{"on_sl_reply",         PARAM_STRING|PARAM_USE_FUNC, fixup_on_sl_reply   },
-	{"contacts_avp",        PARAM_STRING, &contacts_avp_param                },
+	{"contacts_avp",        PARAM_STR, &contacts_avp                },
+	{"contact_flows_avp",   PARAM_STR, &contact_flows_avp           },
 	{"disable_6xx_block",   PARAM_INT, &default_tm_cfg.disable_6xx           },
 	{"local_ack_mode",      PARAM_INT, &default_tm_cfg.local_ack_mode        },
 	{"failure_reply_mode",  PARAM_INT, &failure_reply_mode                   },
@@ -840,11 +845,15 @@ static int mod_init(void)
 		return -1;
 	}
 
-	if (init_avp_params( fr_timer_param, fr_inv_timer_param,
-						 contacts_avp_param)<0 ){
+	if (init_avp_params(fr_timer_param, fr_inv_timer_param) < 0) {
 		LOG(L_ERR,"ERROR:tm:mod_init: failed to process AVP params\n");
 		return -1;
 	}
+	if ((contacts_avp.len > 0) && (contact_flows_avp.len == 0)) {
+	    LOG(L_ERR,"ERROR:tm:mod_init: contact_flows_avp param has not been defined\n");
+	    return -1;
+	}
+
 #ifdef WITH_EVENT_LOCAL_REQUEST
 	goto_on_local_req=route_lookup(&event_rt, "tm:local-request");
 	if (goto_on_local_req>=0 && event_rt.rlist[goto_on_local_req]==0)