2
0
Эх сурвалжийг харах

dmq_usrloc: param batch_max_msg_size

Julien Chavanton 8 жил өмнө
parent
commit
7cb707d672

+ 6 - 0
src/modules/dmq_usrloc/dmq_usrloc.c

@@ -38,6 +38,7 @@ int dmq_usrloc_enable = 0;
 int _dmq_usrloc_sync = 1;
 int _dmq_usrloc_batch_size = 0;
 int _dmq_usrloc_batch_msg_contacts = 1;
+int _dmq_usrloc_batch_msg_size = 60000;
 int _dmq_usrloc_batch_usleep = 0;
 str _dmq_usrloc_domain = str_init("location");
 
@@ -49,6 +50,7 @@ static param_export_t params[] = {
 	{"enable", INT_PARAM, &dmq_usrloc_enable},
 	{"sync",   INT_PARAM, &_dmq_usrloc_sync},
 	{"batch_msg_contacts",   INT_PARAM, &_dmq_usrloc_batch_msg_contacts},
+	{"batch_msg_size",   INT_PARAM, &_dmq_usrloc_batch_msg_size},
 	{"batch_size",   INT_PARAM, &_dmq_usrloc_batch_size},
 	{"batch_usleep", INT_PARAM, &_dmq_usrloc_batch_usleep},
 	{"usrloc_domain", PARAM_STR, &_dmq_usrloc_domain},
@@ -76,6 +78,10 @@ static int mod_init(void)
 	bind_usrloc_t bind_usrloc;
 	LM_INFO("dmq usrloc replication mode = %d\n", dmq_usrloc_enable);
 
+	if(_dmq_usrloc_batch_msg_size > 60000) {
+		LM_ERR("batch_msg_size too high[%d] setting to [60000]\n", _dmq_usrloc_batch_msg_size);
+		_dmq_usrloc_batch_msg_size = 60000;
+	}
 	if(_dmq_usrloc_batch_msg_contacts > 150) {
 		LM_ERR("batch_msg_contacts too high[%d] setting to [150]\n", _dmq_usrloc_batch_msg_contacts);
 		_dmq_usrloc_batch_msg_contacts = 150;

+ 25 - 4
src/modules/dmq_usrloc/doc/dmq_usrloc_admin.xml

@@ -174,6 +174,30 @@ modparam("dmq_usrloc", "batch_size", 4000)
 ...
 modparam("dmq_usrloc", "batch_usleep", 1000)
 ...
+</programlisting>
+	</example>
+	</section>
+	<section id="usrloc_dmq.p.batch_msg_size">
+		<title><varname>batch_msg_size</varname> (int)</title>
+		<para>
+			The parameter controls the size of the messages during a sync
+			This is to make sure the messages are never larger then 65536 (the maximum datagram size)
+
+			Note that batch_msg_contacts will also be checked
+		<para>
+		<emphasis>
+			Default value is 60000.
+			Maximum value is 60000.
+		</emphasis>
+		</para>
+		<example>
+		<title>Set <varname>batch_msg_size</varname> parameter</title>
+		<programlisting format="linespecific">
+...
+modparam("dmq_usrloc", "batch_msg_contacts", 50)  # 50 contacts / message
+modparam("dmq_usrloc", "batch_msg_size", 500000)
+# with this config, when doing a full sync, each message will be sent a soon as the body is larger 50K or contains 50 contacts
+...
 </programlisting>
 	</example>
 	</section>
@@ -182,10 +206,7 @@ modparam("dmq_usrloc", "batch_usleep", 1000)
 		<para>
 			The parameter controls the amount of contact per message/transaction during a sync
 
-			Note that the default maximum size of a contact is 1024, however once converted in the json format
-			we can add 200 characters to reach 1224 (this is only an approximation).
-			Considering 65536 (the maximum datagram size)/1224 = 53,
-			we need to leave some space for UDP and SIP headers, staying &lt;= 50 seems safe in most cases
+			Note that batch_msg_size will also be checked
 		<para>
 		<emphasis>
 			Default value is 1.

+ 32 - 7
src/modules/dmq_usrloc/usrloc_sync.c

@@ -51,6 +51,7 @@ void usrloc_dmq_send_multi_contact_flush(dmq_node_t* node);
 
 extern int _dmq_usrloc_sync;
 extern int _dmq_usrloc_batch_msg_contacts;
+extern int _dmq_usrloc_batch_msg_size;
 extern int _dmq_usrloc_batch_size;
 extern int _dmq_usrloc_batch_usleep;
 extern str _dmq_usrloc_domain;
@@ -318,7 +319,7 @@ int usrloc_dmq_send(str* body, dmq_node_t* node) {
 	return 0;
 }
 
-inline static int usrloc_dmq_execute_action(srjson_t *jdoc_action, dmq_node_t* node) {
+static int usrloc_dmq_execute_action(srjson_t *jdoc_action, dmq_node_t* node) {
 	static ucontact_info_t ci;
 	srjson_t *it = NULL;
 	unsigned int action, expires, cseq, flags, cflags, q, last_modified, methods, reg_id;
@@ -546,6 +547,7 @@ error:
 /* Multi contacts */
 typedef struct jdoc_contact_group {
 	int count;
+	int size;
 	srjson_doc_t jdoc;
 	srjson_t *jdoc_contacts;
 } jdoc_contact_group_t;
@@ -556,6 +558,7 @@ static void usrloc_dmq_contacts_group_init(void) {
 	if (jdoc_contact_group.jdoc.root)
 		return;
 	jdoc_contact_group.count = 0;
+	jdoc_contact_group.size = 12; // {"multi":{}}
 	srjson_InitDoc(&jdoc_contact_group.jdoc, NULL);
 	LM_DBG("init multi contacts batch. \n");
 	jdoc_contact_group.jdoc.root = srjson_CreateObject(&jdoc_contact_group.jdoc);
@@ -568,7 +571,7 @@ static void usrloc_dmq_contacts_group_init(void) {
 	}
 }
 
-inline static void usrloc_dmq_contacts_group_send(dmq_node_t* node) {
+static void usrloc_dmq_contacts_group_send(dmq_node_t* node) {
 	if (jdoc_contact_group.count == 0)
 		return;
 	srjson_doc_t *jdoc = &jdoc_contact_group.jdoc;
@@ -576,7 +579,7 @@ inline static void usrloc_dmq_contacts_group_send(dmq_node_t* node) {
 
 	srjson_AddItemToObject(jdoc, jdoc->root, "multi", jdoc_contacts);
 
-	LM_DBG("json[%s]\n", srjson_Print(jdoc, jdoc->root));
+	LM_DBG("json[%s]\n", srjson_PrintUnformatted(jdoc, jdoc->root));
 	jdoc->buf.s = srjson_PrintUnformatted(jdoc, jdoc->root);
 	if(jdoc->buf.s==NULL) {
 		LM_ERR("unable to serialize data\n");
@@ -586,7 +589,7 @@ inline static void usrloc_dmq_contacts_group_send(dmq_node_t* node) {
 
 	LM_DBG("sending serialized data %.*s\n", jdoc->buf.len, jdoc->buf.s);
 	if (usrloc_dmq_send(&jdoc->buf, node)!=0) {
-		LM_ERR("unable to send data, sleeping 100ms\n");
+		LM_ERR("unable to send data\n");
 		goto error;
 	}
 
@@ -625,30 +628,52 @@ int usrloc_dmq_send_multi_contact(ucontact_t* ptr, str aor, int action, dmq_node
 		LM_ERR("cannot create json root\n");
 		return -1;
 	}
+	LM_DBG("group size[%d]\n", jdoc_contact_group.size);
+	jdoc_contact_group.size += 188; // json overhead ("":{"action":,"aor":"","ruid":"","c":""...)
+
 	srjson_AddNumberToObject(jdoc, jdoc_contact, "action", action);
+	jdoc_contact_group.size += snprintf(NULL,0,"%d", action);
+
 	srjson_AddStrToObject(jdoc, jdoc_contact, "aor", aor.s, aor.len);
+	jdoc_contact_group.size += aor.len;
 	srjson_AddStrToObject(jdoc, jdoc_contact, "ruid", ptr->ruid.s, ptr->ruid.len);
+	jdoc_contact_group.size += ptr->ruid.len;
 	srjson_AddStrToObject(jdoc, jdoc_contact, "c", ptr->c.s, ptr->c.len);
+	jdoc_contact_group.size += ptr->c.len;
 	srjson_AddStrToObject(jdoc, jdoc_contact, "received", ptr->received.s, ptr->received.len);
+	jdoc_contact_group.size += ptr->received.len;
 	srjson_AddStrToObject(jdoc, jdoc_contact, "path", ptr->path.s, ptr->path.len);
+	jdoc_contact_group.size += ptr->path.len;
 	srjson_AddStrToObject(jdoc, jdoc_contact, "callid", ptr->callid.s, ptr->callid.len);
+	jdoc_contact_group.size += ptr->callid.len;
 	srjson_AddStrToObject(jdoc, jdoc_contact, "user_agent", ptr->user_agent.s, ptr->user_agent.len);
+	jdoc_contact_group.size += ptr->user_agent.len;
 	srjson_AddStrToObject(jdoc, jdoc_contact, "instance", ptr->instance.s, ptr->instance.len);
+	jdoc_contact_group.size += ptr->instance.len;
 	srjson_AddNumberToObject(jdoc, jdoc_contact, "expires", ptr->expires);
+	jdoc_contact_group.size += snprintf(NULL,0,"%.0lf",(double)ptr->expires);
 	srjson_AddNumberToObject(jdoc, jdoc_contact, "cseq", ptr->cseq);
+	jdoc_contact_group.size += snprintf(NULL,0,"%d", ptr->cseq);
 	srjson_AddNumberToObject(jdoc, jdoc_contact, "flags", flags);
+	jdoc_contact_group.size += snprintf(NULL,0,"%d", flags);
 	srjson_AddNumberToObject(jdoc, jdoc_contact, "cflags", ptr->cflags);
+	jdoc_contact_group.size += snprintf(NULL,0,"%d", ptr->cflags);
 	srjson_AddNumberToObject(jdoc, jdoc_contact, "q", ptr->q);
+	jdoc_contact_group.size += snprintf(NULL,0,"%d", ptr->q);
 	srjson_AddNumberToObject(jdoc, jdoc_contact, "last_modified", ptr->last_modified);
+	jdoc_contact_group.size += snprintf(NULL,0,"%.0lf",(double)ptr->last_modified);
 	srjson_AddNumberToObject(jdoc, jdoc_contact, "methods", ptr->methods);
+	jdoc_contact_group.size += snprintf(NULL,0,"%u", ptr->methods);
 	srjson_AddNumberToObject(jdoc, jdoc_contact, "reg_id", ptr->reg_id);
+	jdoc_contact_group.size += snprintf(NULL,0,"%d", ptr->reg_id);
 
-	char idx[10];
+	char idx[5];
 	jdoc_contact_group.count++;
-	snprintf(idx,10,"%d", jdoc_contact_group.count);
+	jdoc_contact_group.size += snprintf(idx,5,"%d", jdoc_contact_group.count);
 	srjson_AddItemToObject(jdoc, jdoc_contacts, idx, jdoc_contact);
 
-	if (jdoc_contact_group.count >= _dmq_usrloc_batch_msg_contacts) {
+	if (jdoc_contact_group.count >= _dmq_usrloc_batch_msg_contacts || jdoc_contact_group.size >= _dmq_usrloc_batch_msg_size) {
+		LM_DBG("sending group count[%d]size[%d]", jdoc_contact_group.count, jdoc_contact_group.size);
 		usrloc_dmq_contacts_group_send(node);
 		usrloc_dmq_contacts_group_init();
 	}