|
@@ -18,6 +18,9 @@
|
|
|
* along with this program; if not, write to the Free Software
|
|
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
|
|
*
|
|
|
+ * Edited :
|
|
|
+ *
|
|
|
+ * Copyright (C) 2017 Julien Chavanton, Flowroute
|
|
|
*/
|
|
|
|
|
|
#include "usrloc_sync.h"
|
|
@@ -41,10 +44,14 @@ dmq_resp_cback_t usrloc_dmq_resp_callback = {&usrloc_dmq_resp_callback_f, 0};
|
|
|
int usrloc_dmq_send_all();
|
|
|
int usrloc_dmq_request_sync();
|
|
|
int usrloc_dmq_send_contact(ucontact_t* ptr, str aor, int action, dmq_node_t* node);
|
|
|
+int usrloc_dmq_send_multi_contact(ucontact_t* ptr, str aor, int action, dmq_node_t* node);
|
|
|
+void usrloc_dmq_send_multi_contact_flush(dmq_node_t* node);
|
|
|
|
|
|
#define MAX_AOR_LEN 256
|
|
|
|
|
|
extern int _dmq_usrloc_sync;
|
|
|
+extern int _dmq_usrloc_batch_msg_contacts;
|
|
|
+extern int _dmq_usrloc_batch_msg_size;
|
|
|
extern int _dmq_usrloc_batch_size;
|
|
|
extern int _dmq_usrloc_batch_usleep;
|
|
|
extern str _dmq_usrloc_domain;
|
|
@@ -240,7 +247,11 @@ void usrloc_get_all_ucontact(dmq_node_t* node)
|
|
|
LM_DBG("- AoR: %.*s AoRhash=%d Flags=%d\n", aor.len, aor.s, aorhash, flags);
|
|
|
|
|
|
while (ptr) {
|
|
|
- usrloc_dmq_send_contact(ptr, aor, DMQ_UPDATE, node);
|
|
|
+ if (_dmq_usrloc_batch_msg_contacts >1) {
|
|
|
+ usrloc_dmq_send_multi_contact(ptr, aor, DMQ_UPDATE, node);
|
|
|
+ } else {
|
|
|
+ usrloc_dmq_send_contact(ptr, aor, DMQ_UPDATE, node);
|
|
|
+ }
|
|
|
n++;
|
|
|
ptr = ptr->next;
|
|
|
}
|
|
@@ -254,6 +265,7 @@ void usrloc_get_all_ucontact(dmq_node_t* node)
|
|
|
}
|
|
|
}
|
|
|
dmq_usrloc_free(buf);
|
|
|
+ usrloc_dmq_send_multi_contact_flush(node); // send any remaining contacts
|
|
|
|
|
|
done:
|
|
|
c.s = ""; c.len = 0;
|
|
@@ -307,69 +319,17 @@ int usrloc_dmq_send(str* body, dmq_node_t* node) {
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
-
|
|
|
-/**
|
|
|
- * @brief ht dmq callback
|
|
|
- */
|
|
|
-int usrloc_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* node)
|
|
|
-{
|
|
|
- int content_length;
|
|
|
- str body;
|
|
|
- srjson_doc_t jdoc;
|
|
|
- srjson_t *it = NULL;
|
|
|
+static int usrloc_dmq_execute_action(srjson_t *jdoc_action, dmq_node_t* node) {
|
|
|
static ucontact_info_t ci;
|
|
|
-
|
|
|
- unsigned int action, expires, cseq, flags, cflags, q, last_modified,
|
|
|
- methods, reg_id;
|
|
|
- str aor=STR_NULL, ruid=STR_NULL, c=STR_NULL, received=STR_NULL,
|
|
|
- path=STR_NULL, callid=STR_NULL, user_agent=STR_NULL, instance=STR_NULL;
|
|
|
+ srjson_t *it = NULL;
|
|
|
+ unsigned int action, expires, cseq, flags, cflags, q, last_modified, methods, reg_id;
|
|
|
+ str aor=STR_NULL, ruid=STR_NULL, c=STR_NULL, received=STR_NULL, path=STR_NULL,
|
|
|
+ callid=STR_NULL, user_agent=STR_NULL, instance=STR_NULL;
|
|
|
|
|
|
action = expires = cseq = flags = cflags = q = last_modified
|
|
|
= methods = reg_id = 0;
|
|
|
|
|
|
- srjson_InitDoc(&jdoc, NULL);
|
|
|
- if(parse_from_header(msg)<0) {
|
|
|
- LM_ERR("failed to parse from header\n");
|
|
|
- goto invalid;
|
|
|
- }
|
|
|
- body = ((struct to_body*)msg->from->parsed)->uri;
|
|
|
-
|
|
|
- LM_DBG("dmq message received from %.*s\n", body.len, body.s);
|
|
|
-
|
|
|
- if(parse_headers(msg, HDR_EOH_F, 0)<0) {
|
|
|
- LM_ERR("failed to parse the headers\n");
|
|
|
- goto invalid;
|
|
|
- }
|
|
|
- if(!msg->content_length) {
|
|
|
- LM_ERR("no content length header found\n");
|
|
|
- goto invalid;
|
|
|
- }
|
|
|
- content_length = get_content_length(msg);
|
|
|
- if(!content_length) {
|
|
|
- LM_DBG("content length is 0\n");
|
|
|
- goto invalid;
|
|
|
- }
|
|
|
-
|
|
|
- body.s = get_body(msg);
|
|
|
- body.len = content_length;
|
|
|
-
|
|
|
- if (!body.s) {
|
|
|
- LM_ERR("unable to get body\n");
|
|
|
- goto error;
|
|
|
- }
|
|
|
-
|
|
|
- jdoc.buf = body;
|
|
|
- if(jdoc.root == NULL) {
|
|
|
- jdoc.root = srjson_Parse(&jdoc, jdoc.buf.s);
|
|
|
- if(jdoc.root == NULL)
|
|
|
- {
|
|
|
- LM_ERR("invalid json doc [[%s]]\n", jdoc.buf.s);
|
|
|
- goto invalid;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- for(it=jdoc.root->child; it; it = it->next)
|
|
|
- {
|
|
|
+ for(it=jdoc_action; it; it = it->next) {
|
|
|
if (it->string == NULL) continue;
|
|
|
|
|
|
if (strcmp(it->string, "action")==0) {
|
|
@@ -398,7 +358,7 @@ int usrloc_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t*
|
|
|
} else if (strcmp(it->string, "instance")==0) {
|
|
|
instance.s = it->valuestring;
|
|
|
instance.len = strlen(instance.s);
|
|
|
- } else if (strcmp(it->string, "expires")==0) { //
|
|
|
+ } else if (strcmp(it->string, "expires")==0) {
|
|
|
expires = SRJSON_GET_UINT(it);
|
|
|
} else if (strcmp(it->string, "cseq")==0) {
|
|
|
cseq = SRJSON_GET_UINT(it);
|
|
@@ -454,7 +414,71 @@ int usrloc_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t*
|
|
|
LM_DBG("Received DMQ_NONE. Not used...\n");
|
|
|
break;
|
|
|
default:
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ return 1;
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+/**
|
|
|
+ * @brief ht dmq callback
|
|
|
+ */
|
|
|
+int usrloc_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* node)
|
|
|
+{
|
|
|
+ int content_length;
|
|
|
+ str body;
|
|
|
+ srjson_doc_t jdoc;
|
|
|
+
|
|
|
+ srjson_InitDoc(&jdoc, NULL);
|
|
|
+ if (parse_from_header(msg)<0) {
|
|
|
+ LM_ERR("failed to parse from header\n");
|
|
|
+ goto invalid;
|
|
|
+ }
|
|
|
+ body = ((struct to_body*)msg->from->parsed)->uri;
|
|
|
+
|
|
|
+ LM_DBG("dmq message received from %.*s\n", body.len, body.s);
|
|
|
+
|
|
|
+ if (parse_headers(msg, HDR_EOH_F, 0)<0) {
|
|
|
+ LM_ERR("failed to parse the headers\n");
|
|
|
+ goto invalid;
|
|
|
+ }
|
|
|
+ if (!msg->content_length) {
|
|
|
+ LM_ERR("no content length header found\n");
|
|
|
+ goto invalid;
|
|
|
+ }
|
|
|
+ content_length = get_content_length(msg);
|
|
|
+ if (!content_length) {
|
|
|
+ LM_DBG("content length is 0\n");
|
|
|
+ goto invalid;
|
|
|
+ }
|
|
|
+
|
|
|
+ body.s = get_body(msg);
|
|
|
+ body.len = content_length;
|
|
|
+
|
|
|
+ if (!body.s) {
|
|
|
+ LM_ERR("unable to get body\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+
|
|
|
+ jdoc.buf = body;
|
|
|
+ if (jdoc.root == NULL) {
|
|
|
+ jdoc.root = srjson_Parse(&jdoc, jdoc.buf.s);
|
|
|
+ if (jdoc.root == NULL) {
|
|
|
+ LM_ERR("invalid json doc [[%s]]\n", jdoc.buf.s);
|
|
|
goto invalid;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (strcmp(jdoc.root->child->string, "multi")==0) {
|
|
|
+ LM_DBG("request [%s]\n", jdoc.root->child->string);
|
|
|
+ srjson_t *jdoc_actions = jdoc.root->child->child;
|
|
|
+ srjson_t *it = NULL;
|
|
|
+ for(it=jdoc_actions; it; it = it->next) {
|
|
|
+ LM_DBG("action [%s]\n", jdoc_actions->child->string);
|
|
|
+ if (!usrloc_dmq_execute_action(it->child, node)) goto invalid;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if (!usrloc_dmq_execute_action(jdoc.root->child, node)) goto invalid;
|
|
|
}
|
|
|
|
|
|
srjson_DestroyDoc(&jdoc);
|
|
@@ -517,6 +541,149 @@ error:
|
|
|
return -1;
|
|
|
}
|
|
|
|
|
|
+/* while prt append json string
|
|
|
+ * */
|
|
|
+
|
|
|
+/* Multi contacts */
|
|
|
+typedef struct jdoc_contact_group {
|
|
|
+ int count;
|
|
|
+ int size;
|
|
|
+ srjson_doc_t jdoc;
|
|
|
+ srjson_t *jdoc_contacts;
|
|
|
+} jdoc_contact_group_t;
|
|
|
+
|
|
|
+static jdoc_contact_group_t jdoc_contact_group;
|
|
|
+
|
|
|
+static void usrloc_dmq_contacts_group_init(void) {
|
|
|
+ if (jdoc_contact_group.jdoc.root)
|
|
|
+ return;
|
|
|
+ jdoc_contact_group.count = 0;
|
|
|
+ jdoc_contact_group.size = 12; // {"multi":{}}
|
|
|
+ srjson_InitDoc(&jdoc_contact_group.jdoc, NULL);
|
|
|
+ LM_DBG("init multi contacts batch. \n");
|
|
|
+ jdoc_contact_group.jdoc.root = srjson_CreateObject(&jdoc_contact_group.jdoc);
|
|
|
+ if (jdoc_contact_group.jdoc.root==NULL)
|
|
|
+ LM_ERR("cannot create json root ! \n");
|
|
|
+ jdoc_contact_group.jdoc_contacts = srjson_CreateObject(&jdoc_contact_group.jdoc);
|
|
|
+ if (jdoc_contact_group.jdoc_contacts==NULL) {
|
|
|
+ LM_ERR("cannot create json contacts ! \n");
|
|
|
+ srjson_DestroyDoc(&jdoc_contact_group.jdoc);
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+static void usrloc_dmq_contacts_group_send(dmq_node_t* node) {
|
|
|
+ if (jdoc_contact_group.count == 0)
|
|
|
+ return;
|
|
|
+ srjson_doc_t *jdoc = &jdoc_contact_group.jdoc;
|
|
|
+ srjson_t *jdoc_contacts = jdoc_contact_group.jdoc_contacts;
|
|
|
+
|
|
|
+ srjson_AddItemToObject(jdoc, jdoc->root, "multi", jdoc_contacts);
|
|
|
+
|
|
|
+ LM_DBG("json[%s]\n", srjson_PrintUnformatted(jdoc, jdoc->root));
|
|
|
+ jdoc->buf.s = srjson_PrintUnformatted(jdoc, jdoc->root);
|
|
|
+ if(jdoc->buf.s==NULL) {
|
|
|
+ LM_ERR("unable to serialize data\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ jdoc->buf.len = strlen(jdoc->buf.s);
|
|
|
+
|
|
|
+ LM_DBG("sending serialized data %.*s\n", jdoc->buf.len, jdoc->buf.s);
|
|
|
+ if (usrloc_dmq_send(&jdoc->buf, node)!=0) {
|
|
|
+ LM_ERR("unable to send data\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+
|
|
|
+ jdoc->free_fn(jdoc->buf.s);
|
|
|
+ jdoc->buf.s = NULL;
|
|
|
+ srjson_DestroyDoc(jdoc);
|
|
|
+ return;
|
|
|
+
|
|
|
+error:
|
|
|
+ if(jdoc->buf.s!=NULL) {
|
|
|
+ jdoc->free_fn(jdoc->buf.s);
|
|
|
+ jdoc->buf.s = NULL;
|
|
|
+ }
|
|
|
+ srjson_DestroyDoc(jdoc);
|
|
|
+ return;
|
|
|
+}
|
|
|
+
|
|
|
+void usrloc_dmq_send_multi_contact_flush(dmq_node_t* node) {
|
|
|
+ usrloc_dmq_contacts_group_send(node);
|
|
|
+ usrloc_dmq_contacts_group_init();
|
|
|
+}
|
|
|
+
|
|
|
+int usrloc_dmq_send_multi_contact(ucontact_t* ptr, str aor, int action, dmq_node_t* node) {
|
|
|
+
|
|
|
+ usrloc_dmq_contacts_group_init();
|
|
|
+
|
|
|
+ srjson_doc_t *jdoc = &jdoc_contact_group.jdoc;
|
|
|
+ srjson_t *jdoc_contacts = jdoc_contact_group.jdoc_contacts;
|
|
|
+
|
|
|
+ int flags;
|
|
|
+ flags = ptr->flags;
|
|
|
+ flags &= ~FL_RPL;
|
|
|
+
|
|
|
+ srjson_t * jdoc_contact = srjson_CreateObject(jdoc);
|
|
|
+ if(!jdoc_contact) {
|
|
|
+ LM_ERR("cannot create json root\n");
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ LM_DBG("group size[%d]\n", jdoc_contact_group.size);
|
|
|
+ jdoc_contact_group.size += 188; // json overhead ("":{"action":,"aor":"","ruid":"","c":""...)
|
|
|
+
|
|
|
+ srjson_AddNumberToObject(jdoc, jdoc_contact, "action", action);
|
|
|
+ jdoc_contact_group.size += snprintf(NULL,0,"%d", action);
|
|
|
+
|
|
|
+ srjson_AddStrToObject(jdoc, jdoc_contact, "aor", aor.s, aor.len);
|
|
|
+ jdoc_contact_group.size += aor.len;
|
|
|
+ srjson_AddStrToObject(jdoc, jdoc_contact, "ruid", ptr->ruid.s, ptr->ruid.len);
|
|
|
+ jdoc_contact_group.size += ptr->ruid.len;
|
|
|
+ srjson_AddStrToObject(jdoc, jdoc_contact, "c", ptr->c.s, ptr->c.len);
|
|
|
+ jdoc_contact_group.size += ptr->c.len;
|
|
|
+ srjson_AddStrToObject(jdoc, jdoc_contact, "received", ptr->received.s, ptr->received.len);
|
|
|
+ jdoc_contact_group.size += ptr->received.len;
|
|
|
+ srjson_AddStrToObject(jdoc, jdoc_contact, "path", ptr->path.s, ptr->path.len);
|
|
|
+ jdoc_contact_group.size += ptr->path.len;
|
|
|
+ srjson_AddStrToObject(jdoc, jdoc_contact, "callid", ptr->callid.s, ptr->callid.len);
|
|
|
+ jdoc_contact_group.size += ptr->callid.len;
|
|
|
+ srjson_AddStrToObject(jdoc, jdoc_contact, "user_agent", ptr->user_agent.s, ptr->user_agent.len);
|
|
|
+ jdoc_contact_group.size += ptr->user_agent.len;
|
|
|
+ srjson_AddStrToObject(jdoc, jdoc_contact, "instance", ptr->instance.s, ptr->instance.len);
|
|
|
+ jdoc_contact_group.size += ptr->instance.len;
|
|
|
+ srjson_AddNumberToObject(jdoc, jdoc_contact, "expires", ptr->expires);
|
|
|
+ jdoc_contact_group.size += snprintf(NULL,0,"%.0lf",(double)ptr->expires);
|
|
|
+ srjson_AddNumberToObject(jdoc, jdoc_contact, "cseq", ptr->cseq);
|
|
|
+ jdoc_contact_group.size += snprintf(NULL,0,"%d", ptr->cseq);
|
|
|
+ srjson_AddNumberToObject(jdoc, jdoc_contact, "flags", flags);
|
|
|
+ jdoc_contact_group.size += snprintf(NULL,0,"%d", flags);
|
|
|
+ srjson_AddNumberToObject(jdoc, jdoc_contact, "cflags", ptr->cflags);
|
|
|
+ jdoc_contact_group.size += snprintf(NULL,0,"%d", ptr->cflags);
|
|
|
+ srjson_AddNumberToObject(jdoc, jdoc_contact, "q", ptr->q);
|
|
|
+ jdoc_contact_group.size += snprintf(NULL,0,"%d", ptr->q);
|
|
|
+ srjson_AddNumberToObject(jdoc, jdoc_contact, "last_modified", ptr->last_modified);
|
|
|
+ jdoc_contact_group.size += snprintf(NULL,0,"%.0lf",(double)ptr->last_modified);
|
|
|
+ srjson_AddNumberToObject(jdoc, jdoc_contact, "methods", ptr->methods);
|
|
|
+ jdoc_contact_group.size += snprintf(NULL,0,"%u", ptr->methods);
|
|
|
+ srjson_AddNumberToObject(jdoc, jdoc_contact, "reg_id", ptr->reg_id);
|
|
|
+ jdoc_contact_group.size += snprintf(NULL,0,"%d", ptr->reg_id);
|
|
|
+
|
|
|
+ char idx[5];
|
|
|
+ jdoc_contact_group.count++;
|
|
|
+ jdoc_contact_group.size += snprintf(idx,5,"%d", jdoc_contact_group.count);
|
|
|
+ srjson_AddItemToObject(jdoc, jdoc_contacts, idx, jdoc_contact);
|
|
|
+
|
|
|
+ if (jdoc_contact_group.count >= _dmq_usrloc_batch_msg_contacts || jdoc_contact_group.size >= _dmq_usrloc_batch_msg_size) {
|
|
|
+ LM_DBG("sending group count[%d]size[%d]", jdoc_contact_group.count, jdoc_contact_group.size);
|
|
|
+ usrloc_dmq_contacts_group_send(node);
|
|
|
+ usrloc_dmq_contacts_group_init();
|
|
|
+ }
|
|
|
+
|
|
|
+ return 0;
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
int usrloc_dmq_send_contact(ucontact_t* ptr, str aor, int action, dmq_node_t* node) {
|
|
|
srjson_doc_t jdoc;
|
|
|
srjson_InitDoc(&jdoc, NULL);
|
|
@@ -524,7 +691,7 @@ int usrloc_dmq_send_contact(ucontact_t* ptr, str aor, int action, dmq_node_t* no
|
|
|
int flags;
|
|
|
|
|
|
jdoc.root = srjson_CreateObject(&jdoc);
|
|
|
- if(jdoc.root==NULL) {
|
|
|
+ if(!jdoc.root) {
|
|
|
LM_ERR("cannot create json root\n");
|
|
|
goto error;
|
|
|
}
|