Browse Source

dmq_usrloc: sync with multi contacts per message

Julien Chavanton 8 năm trước cách đây
mục cha
commit
f51f4df75c

+ 7 - 0
src/modules/dmq_usrloc/dmq_usrloc.c

@@ -37,6 +37,7 @@ static int child_init(int);
 int dmq_usrloc_enable = 0;
 int _dmq_usrloc_sync = 1;
 int _dmq_usrloc_batch_size = 0;
+int _dmq_usrloc_batch_msg_contacts = 1;
 int _dmq_usrloc_batch_usleep = 0;
 str _dmq_usrloc_domain = str_init("location");
 
@@ -47,6 +48,7 @@ MODULE_VERSION
 static param_export_t params[] = {
 	{"enable", INT_PARAM, &dmq_usrloc_enable},
 	{"sync",   INT_PARAM, &_dmq_usrloc_sync},
+	{"batch_msg_contacts",   INT_PARAM, &_dmq_usrloc_batch_msg_contacts},
 	{"batch_size",   INT_PARAM, &_dmq_usrloc_batch_size},
 	{"batch_usleep", INT_PARAM, &_dmq_usrloc_batch_usleep},
 	{"usrloc_domain", PARAM_STR, &_dmq_usrloc_domain},
@@ -74,6 +76,11 @@ static int mod_init(void)
 	bind_usrloc_t bind_usrloc;
 	LM_INFO("dmq usrloc replication mode = %d\n", dmq_usrloc_enable);
 
+	if(_dmq_usrloc_batch_msg_contacts > 150) {
+		LM_ERR("batch_msg_contacts too high[%d] setting to [150]\n", _dmq_usrloc_batch_msg_contacts);
+		_dmq_usrloc_batch_msg_contacts = 150;
+	}
+
 	if (dmq_usrloc_enable) {
 
 		bind_usrloc = (bind_usrloc_t)find_export("ul_bind_usrloc", 1, 0);

+ 15 - 0
src/modules/dmq_usrloc/doc/dmq_usrloc.xml

@@ -39,6 +39,21 @@
 	<copyright>
 	    <year>2014</year>
 	</copyright>
+	    <editor>
+		<firstname>Julien</firstname>
+		<surname>Chavanton</surname>
+		<affiliation><orgname>flowroute.com</orgname></affiliation>
+		<email>[email protected]</email>
+		<address>
+		<otheraddr>
+		<ulink></ulink>
+		</otheraddr>
+		</address>
+	    </editor>
+	</authorgroup>
+	<copyright>
+	    <year>2017</year>
+	</copyright>
     </bookinfo>
     <toc></toc>
     

+ 27 - 0
src/modules/dmq_usrloc/doc/dmq_usrloc_admin.xml

@@ -174,6 +174,33 @@ modparam("dmq_usrloc", "batch_size", 4000)
 ...
 modparam("dmq_usrloc", "batch_usleep", 1000)
 ...
+</programlisting>
+	</example>
+	</section>
+	<section id="usrloc_dmq.p.batch_msg_contacts">
+		<title><varname>batch_msg_contacts</varname> (int)</title>
+		<para>
+			The parameter controls the amount of contact per message/transaction during a sync
+
+			Note that the default maximum size of a contact is 1024, however once converted in the json format
+			we can add 200 characters to reach 1224 (this is only an approximation).
+			Considering 65536 (the maximum datagram size)/1224 = 53,
+			we need to leave some space for UDP and SIP headers, staying &lt;= 50 seems safe in most cases
+		<para>
+		<emphasis>
+			Default value is 1.
+			Maximum value is 150.
+		</emphasis>
+		</para>
+		<example>
+		<title>Set <varname>batch_msg_contacts</varname> parameter</title>
+		<programlisting format="linespecific">
+...
+modparam("dmq_usrloc", "batch_msg_contacts", 50)  # 50 contacts / message
+modparam("dmq_usrloc", "batch_size", 10000)       # 10000 contacts / batch
+modparam("dmq_usrloc", "batch_usleep", 500000)    # one batch every 500ms
+# syncing 20K contacts/second with 50 contacts/message
+...
 </programlisting>
 	</example>
 	</section>

+ 203 - 61
src/modules/dmq_usrloc/usrloc_sync.c

@@ -18,6 +18,9 @@
  * along with this program; if not, write to the Free Software
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
  *
+ * Edited :
+ *
+ * Copyright (C) 2017 Julien Chavanton, Flowroute
  */
 
 #include "usrloc_sync.h"
@@ -41,10 +44,13 @@ dmq_resp_cback_t usrloc_dmq_resp_callback = {&usrloc_dmq_resp_callback_f, 0};
 int usrloc_dmq_send_all();
 int usrloc_dmq_request_sync();
 int usrloc_dmq_send_contact(ucontact_t* ptr, str aor, int action, dmq_node_t* node);
+int usrloc_dmq_send_multi_contact(ucontact_t* ptr, str aor, int action, dmq_node_t* node);
+void usrloc_dmq_send_multi_contact_flush(dmq_node_t* node);
 
 #define MAX_AOR_LEN 256
 
 extern int _dmq_usrloc_sync;
+extern int _dmq_usrloc_batch_msg_contacts;
 extern int _dmq_usrloc_batch_size;
 extern int _dmq_usrloc_batch_usleep;
 extern str _dmq_usrloc_domain;
@@ -240,7 +246,11 @@ void usrloc_get_all_ucontact(dmq_node_t* node)
 		LM_DBG("- AoR: %.*s  AoRhash=%d  Flags=%d\n", aor.len, aor.s, aorhash, flags);
 
 		while (ptr) {
-			usrloc_dmq_send_contact(ptr, aor, DMQ_UPDATE, node);
+			if (_dmq_usrloc_batch_msg_contacts >1) {
+				usrloc_dmq_send_multi_contact(ptr, aor, DMQ_UPDATE, node);
+			} else {
+				usrloc_dmq_send_contact(ptr, aor, DMQ_UPDATE, node);
+			}
 			n++;
 			ptr = ptr->next;
 		}
@@ -254,6 +264,7 @@ void usrloc_get_all_ucontact(dmq_node_t* node)
 		}
 	}
 	dmq_usrloc_free(buf);
+	usrloc_dmq_send_multi_contact_flush(node); // send any remaining contacts
 
 done:
 	c.s = ""; c.len = 0;
@@ -307,69 +318,17 @@ int usrloc_dmq_send(str* body, dmq_node_t* node) {
 	return 0;
 }
 
-
-/**
- * @brief ht dmq callback
- */
-int usrloc_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* node)
-{
-	int content_length;
-	str body;
-	srjson_doc_t jdoc;
-	srjson_t *it = NULL;
+inline static int usrloc_dmq_execute_action(srjson_t *jdoc_action, dmq_node_t* node) {
 	static ucontact_info_t ci;
-
-	unsigned int action, expires, cseq, flags, cflags, q, last_modified,
-				 methods, reg_id;
-	str aor=STR_NULL, ruid=STR_NULL, c=STR_NULL, received=STR_NULL,
-		path=STR_NULL, callid=STR_NULL, user_agent=STR_NULL, instance=STR_NULL;
+	srjson_t *it = NULL;
+	unsigned int action, expires, cseq, flags, cflags, q, last_modified, methods, reg_id;
+	str aor=STR_NULL, ruid=STR_NULL, c=STR_NULL, received=STR_NULL, path=STR_NULL,
+		callid=STR_NULL, user_agent=STR_NULL, instance=STR_NULL;
 
 	action = expires = cseq = flags = cflags = q = last_modified
 		= methods = reg_id = 0;
 
-	srjson_InitDoc(&jdoc, NULL);
-	if(parse_from_header(msg)<0) {
-		LM_ERR("failed to parse from header\n");
-		goto invalid;
-	}
-	body = ((struct to_body*)msg->from->parsed)->uri;
-
-	LM_DBG("dmq message received from %.*s\n", body.len, body.s);
-
-	if(parse_headers(msg, HDR_EOH_F, 0)<0) {
-		LM_ERR("failed to parse the headers\n");
-		goto invalid;
-	}
-	if(!msg->content_length) {
-		LM_ERR("no content length header found\n");
-		goto invalid;
-	}
-	content_length = get_content_length(msg);
-	if(!content_length) {
-		LM_DBG("content length is 0\n");
-		goto invalid;
-	}
-
-	body.s = get_body(msg);
-	body.len = content_length;
-
-	if (!body.s) {
-		LM_ERR("unable to get body\n");
-		goto error;
-	}
-
-	jdoc.buf = body;
-	if(jdoc.root == NULL) {
-		jdoc.root = srjson_Parse(&jdoc, jdoc.buf.s);
-		if(jdoc.root == NULL)
-		{
-			LM_ERR("invalid json doc [[%s]]\n", jdoc.buf.s);
-			goto invalid;
-		}
-	}
-
-	for(it=jdoc.root->child; it; it = it->next)
-	{
+	for(it=jdoc_action; it; it = it->next) {
 		if (it->string == NULL) continue;
 
 		if (strcmp(it->string, "action")==0) {
@@ -398,7 +357,7 @@ int usrloc_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t*
 		} else if (strcmp(it->string, "instance")==0) {
 			instance.s = it->valuestring;
 			instance.len = strlen(instance.s);
-		} else if (strcmp(it->string, "expires")==0) { //
+		} else if (strcmp(it->string, "expires")==0) {
 			expires = SRJSON_GET_UINT(it);
 		} else if (strcmp(it->string, "cseq")==0) {
 			cseq = SRJSON_GET_UINT(it);
@@ -454,7 +413,71 @@ int usrloc_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t*
 			LM_DBG("Received DMQ_NONE. Not used...\n");
 			break;
 		default:
+			return 0;
+	}
+	return 1;
+}
+
+
+/**
+ * @brief ht dmq callback
+ */
+int usrloc_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* node)
+{
+	int content_length;
+	str body;
+	srjson_doc_t jdoc;
+
+	srjson_InitDoc(&jdoc, NULL);
+	if (parse_from_header(msg)<0) {
+		LM_ERR("failed to parse from header\n");
+		goto invalid;
+	}
+	body = ((struct to_body*)msg->from->parsed)->uri;
+
+	LM_DBG("dmq message received from %.*s\n", body.len, body.s);
+
+	if (parse_headers(msg, HDR_EOH_F, 0)<0) {
+		LM_ERR("failed to parse the headers\n");
+		goto invalid;
+	}
+	if (!msg->content_length) {
+		LM_ERR("no content length header found\n");
+		goto invalid;
+	}
+	content_length = get_content_length(msg);
+	if (!content_length) {
+		LM_DBG("content length is 0\n");
+		goto invalid;
+	}
+
+	body.s = get_body(msg);
+	body.len = content_length;
+
+	if (!body.s) {
+		LM_ERR("unable to get body\n");
+		goto error;
+	}
+
+	jdoc.buf = body;
+	if (jdoc.root == NULL) {
+		jdoc.root = srjson_Parse(&jdoc, jdoc.buf.s);
+		if (jdoc.root == NULL) {
+			LM_ERR("invalid json doc [[%s]]\n", jdoc.buf.s);
 			goto invalid;
+		}
+	}
+
+	if (strcmp(jdoc.root->child->string, "multi")==0) {
+		LM_DBG("request [%s]\n", jdoc.root->child->string);
+		srjson_t *jdoc_actions = jdoc.root->child->child;
+		srjson_t *it = NULL;
+		for(it=jdoc_actions; it; it = it->next) {
+			LM_DBG("action [%s]\n", jdoc_actions->child->string);
+			if (!usrloc_dmq_execute_action(it->child, node)) goto invalid;
+		}
+	} else {
+		if (!usrloc_dmq_execute_action(jdoc.root->child, node)) goto invalid;
 	}
 
 	srjson_DestroyDoc(&jdoc);
@@ -517,6 +540,125 @@ error:
 	return -1;
 }
 
+/* while prt append json string
+ * */
+
+/* Multi contacts */
+typedef struct jdoc_contact_group {
+	int count;
+	srjson_doc_t jdoc;
+	srjson_t *jdoc_contacts;
+} jdoc_contact_group_t;
+
+static jdoc_contact_group_t jdoc_contact_group;
+
+static void usrloc_dmq_contacts_group_init(void) {
+	if (jdoc_contact_group.jdoc.root)
+		return;
+	jdoc_contact_group.count = 0;
+	srjson_InitDoc(&jdoc_contact_group.jdoc, NULL);
+	LM_DBG("init multi contacts batch. \n");
+	jdoc_contact_group.jdoc.root = srjson_CreateObject(&jdoc_contact_group.jdoc);
+	if (jdoc_contact_group.jdoc.root==NULL)
+		LM_ERR("cannot create json root ! \n");
+	jdoc_contact_group.jdoc_contacts = srjson_CreateObject(&jdoc_contact_group.jdoc);
+	if (jdoc_contact_group.jdoc_contacts==NULL) {
+		LM_ERR("cannot create json contacts ! \n");
+		srjson_DestroyDoc(&jdoc_contact_group.jdoc);
+	}
+}
+
+inline static void usrloc_dmq_contacts_group_send(dmq_node_t* node) {
+	if (jdoc_contact_group.count == 0)
+		return;
+	srjson_doc_t *jdoc = &jdoc_contact_group.jdoc;
+	srjson_t *jdoc_contacts = jdoc_contact_group.jdoc_contacts;
+
+	srjson_AddItemToObject(jdoc, jdoc->root, "multi", jdoc_contacts);
+
+	LM_DBG("json[%s]\n", srjson_Print(jdoc, jdoc->root));
+	jdoc->buf.s = srjson_PrintUnformatted(jdoc, jdoc->root);
+	if(jdoc->buf.s==NULL) {
+		LM_ERR("unable to serialize data\n");
+		goto error;
+	}
+	jdoc->buf.len = strlen(jdoc->buf.s);
+
+	LM_DBG("sending serialized data %.*s\n", jdoc->buf.len, jdoc->buf.s);
+	if (usrloc_dmq_send(&jdoc->buf, node)!=0) {
+		LM_ERR("unable to send data, sleeping 100ms\n");
+		goto error;
+	}
+
+	jdoc->free_fn(jdoc->buf.s);
+	jdoc->buf.s = NULL;
+	srjson_DestroyDoc(jdoc);
+	return;
+
+error:
+	if(jdoc->buf.s!=NULL) {
+		jdoc->free_fn(jdoc->buf.s);
+		jdoc->buf.s = NULL;
+	}
+	srjson_DestroyDoc(jdoc);
+	return;
+}
+
+void usrloc_dmq_send_multi_contact_flush(dmq_node_t* node) {
+	usrloc_dmq_contacts_group_send(node);
+	usrloc_dmq_contacts_group_init();
+}
+
+int usrloc_dmq_send_multi_contact(ucontact_t* ptr, str aor, int action, dmq_node_t* node) {
+
+	usrloc_dmq_contacts_group_init();
+
+	srjson_doc_t *jdoc = &jdoc_contact_group.jdoc;
+	srjson_t *jdoc_contacts = jdoc_contact_group.jdoc_contacts;
+
+	int flags;
+	flags = ptr->flags;
+	flags &= ~FL_RPL;
+
+	srjson_t * jdoc_contact = srjson_CreateObject(jdoc);
+	if(!jdoc_contact) {
+		LM_ERR("cannot create json root\n");
+		return -1;
+	}
+	srjson_AddNumberToObject(jdoc, jdoc_contact, "action", action);
+	srjson_AddStrToObject(jdoc, jdoc_contact, "aor", aor.s, aor.len);
+	srjson_AddStrToObject(jdoc, jdoc_contact, "ruid", ptr->ruid.s, ptr->ruid.len);
+	srjson_AddStrToObject(jdoc, jdoc_contact, "c", ptr->c.s, ptr->c.len);
+	srjson_AddStrToObject(jdoc, jdoc_contact, "received", ptr->received.s, ptr->received.len);
+	srjson_AddStrToObject(jdoc, jdoc_contact, "path", ptr->path.s, ptr->path.len);
+	srjson_AddStrToObject(jdoc, jdoc_contact, "callid", ptr->callid.s, ptr->callid.len);
+	srjson_AddStrToObject(jdoc, jdoc_contact, "user_agent", ptr->user_agent.s, ptr->user_agent.len);
+	srjson_AddStrToObject(jdoc, jdoc_contact, "instance", ptr->instance.s, ptr->instance.len);
+	srjson_AddNumberToObject(jdoc, jdoc_contact, "expires", ptr->expires);
+	srjson_AddNumberToObject(jdoc, jdoc_contact, "cseq", ptr->cseq);
+	srjson_AddNumberToObject(jdoc, jdoc_contact, "flags", flags);
+	srjson_AddNumberToObject(jdoc, jdoc_contact, "cflags", ptr->cflags);
+	srjson_AddNumberToObject(jdoc, jdoc_contact, "q", ptr->q);
+	srjson_AddNumberToObject(jdoc, jdoc_contact, "last_modified", ptr->last_modified);
+	srjson_AddNumberToObject(jdoc, jdoc_contact, "methods", ptr->methods);
+	srjson_AddNumberToObject(jdoc, jdoc_contact, "reg_id", ptr->reg_id);
+
+	char idx[10];
+	jdoc_contact_group.count++;
+	snprintf(idx,10,"%d", jdoc_contact_group.count);
+	srjson_AddItemToObject(jdoc, jdoc_contacts, idx, jdoc_contact);
+
+	if (jdoc_contact_group.count >= _dmq_usrloc_batch_msg_contacts) {
+		usrloc_dmq_contacts_group_send(node);
+		usrloc_dmq_contacts_group_init();
+	}
+
+	return 0;
+}
+
+
+
+
 int usrloc_dmq_send_contact(ucontact_t* ptr, str aor, int action, dmq_node_t* node) {
 	srjson_doc_t jdoc;
 	srjson_InitDoc(&jdoc, NULL);
@@ -524,7 +666,7 @@ int usrloc_dmq_send_contact(ucontact_t* ptr, str aor, int action, dmq_node_t* no
 	int flags;
 
 	jdoc.root = srjson_CreateObject(&jdoc);
-	if(jdoc.root==NULL) {
+	if(!jdoc.root) {
 		LM_ERR("cannot create json root\n");
 		goto error;
 	}