Browse Source

Merge pull request #3 from AndreyRybkin/master

usrloc_dmq: add module for usrloc sync via dmq
Daniel-Constantin Mierla 10 years ago
parent
commit
2f1c30a545

+ 9 - 0
modules/usrloc_dmq/Makefile

@@ -0,0 +1,9 @@
+include ../../Makefile.defs
+auto_gen=
+NAME=usrloc_dmq.so
+
+DEFS+=-DKAMAILIO_MOD_INTERFACE
+
+SERLIBPATH=../../lib
+SER_LIBS+=$(SERLIBPATH)/srutils/srutils
+include ../../Makefile.modules

+ 4 - 0
modules/usrloc_dmq/doc/Makefile

@@ -0,0 +1,4 @@
+docs = usrloc_dmq.xml
+
+docbook_dir = ../../../docbook
+include $(docbook_dir)/Makefile.module

+ 48 - 0
modules/usrloc_dmq/doc/usrloc_dmq.xml

@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding='ISO-8859-1'?>
+<!DOCTYPE book PUBLIC "-//OASIS//DTD DocBook XML V4.4//EN"
+"http://www.oasis-open.org/docbook/xml/4.4/docbookx.dtd" [
+
+<!-- Include general documentation entities -->
+<!ENTITY % docentities SYSTEM "../../../docbook/entities.xml">
+%docentities;
+
+]>
+
+<book xmlns:xi="http://www.w3.org/2001/XInclude">
+    <bookinfo>
+	<title>usrloc_dmq Module</title>
+	<productname class="trade">&kamailioname;</productname>
+	<authorgroup>
+	    <author>
+		<firstname>Andrey</firstname>
+		<surname>Rybkin</surname>
+		<affiliation><orgname>bks.tv</orgname></affiliation>
+		<email>[email protected]</email>
+		<address>
+		<otheraddr>
+		<ulink></ulink>
+		</otheraddr>
+		</address>
+	    </author>
+	    <editor>
+		<firstname>Andrey</firstname>
+		<surname>Rybkin</surname>
+		<affiliation><orgname>bks.tv</orgname></affiliation>
+		<email>[email protected]</email>
+		<address>
+		<otheraddr>
+		<ulink></ulink>
+		</otheraddr>
+		</address>
+	    </editor>
+	</authorgroup>
+	<copyright>
+	    <year>2014</year>
+	</copyright>
+    </bookinfo>
+    <toc></toc>
+    
+    <xi:include href="usrloc_dmq_admin.xml"/>
+    
+    
+</book>

+ 74 - 0
modules/usrloc_dmq/doc/usrloc_dmq_admin.xml

@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding='ISO-8859-1'?>
+<!DOCTYPE book PUBLIC "-//OASIS//DTD DocBook XML V4.4//EN"
+"http://www.oasis-open.org/docbook/xml/4.4/docbookx.dtd" [
+
+<!-- Include general documentation entities -->
+<!ENTITY % docentities SYSTEM "../../../docbook/entities.xml">
+%docentities;
+
+]>
+<!-- Module User's Guide -->
+
+<chapter>
+	
+	<title>&adminguide;</title>
+	
+	<section>
+	<title>Overview</title>
+	<para>
+		The module add usrloc contacts replication between multiple servers via DMQ module.
+	</para>
+	</section>
+	<section>
+	<title>Dependencies</title>
+	<section>
+		<title>&kamailio; Modules</title>
+		<para>
+		The following modules must be loaded before this module:
+			<itemizedlist>
+			<listitem>
+			<para>
+				<emphasis>DMQ module must be loaded first.</emphasis>.
+				<emphasis>USRLOC module must be loaded first.</emphasis>.
+			</para>
+			</listitem>
+			</itemizedlist>
+		</para>
+	</section>
+	<section>
+	<title>Parameters</title>
+	<section id="usrloc_dmq.p.enable">
+		<title><varname>enable</varname> (int)</title>
+		<para>
+		USRLOC replication
+			0 - disabled
+			1 - enabled
+		</para>
+		<emphasis>
+			Default value is 0.
+		</emphasis>
+		</para>
+	</section>
+	<section id="usrloc_dmq.p.flag">
+		<title><varname>flag</varname> (int)</title>
+		<para>
+			Flag to be used for marking if a contact should be constructed for the DMQ
+		</para>
+		<para>
+		<emphasis>
+			Default value is 2.
+		</emphasis>
+		</para>
+		<example>
+		<title>Set <varname>flag</varname> parameter</title>
+		<programlisting format="linespecific">
+...
+modparam("usrloc_dmq", "flag", 2)
+...
+</programlisting>
+		</example>
+	</section>
+	</section>
+
+</chapter>
+

+ 87 - 0
modules/usrloc_dmq/usrloc_dmq.c

@@ -0,0 +1,87 @@
+#include <stdio.h>
+#include "../../sr_module.h"
+#include "../../dprint.h"
+#include "../../error.h"
+#include "../../modules/usrloc/usrloc.h"
+#include "../usrloc/ul_callback.h"
+#include "../../modules/sl/sl.h"
+#include "../../mod_fix.h"
+
+#include "usrloc_sync.h"
+
+static int mod_init(void); 
+static int child_init(int);
+
+int enable_usrloc = 0;
+int usrloc_syncflag = 2;
+
+MODULE_VERSION
+
+static param_export_t params[] = {
+	{"enable", INT_PARAM, &enable_usrloc},
+	{"flag", INT_PARAM, &usrloc_syncflag},
+	{0, 0, 0}
+};
+
+struct module_exports exports = {
+	"usrloc_dmq",				/* module name */
+	DEFAULT_DLFLAGS,		/* dlopen flags */
+	0,						/* exported functions */
+	params,					/* exported parameters */
+	0,						/* exported statistics */
+	0,   					/* exported MI functions */
+	0,						/* exported pseudo-variables */
+	0,						/* extra processes */
+	mod_init,				/* module initialization function */
+	0,   					/* response handling function */
+	0, 						/* destroy function */
+	child_init              /* per-child init function */
+};
+
+
+static int mod_init(void)
+{
+		LM_ERR("dmq_sync loaded: usrloc=%d\n", enable_usrloc);
+		
+		if (enable_usrloc) {
+			usrloc_dmq_flag = 1 << usrloc_syncflag;
+			bind_usrloc_t bind_usrloc;
+			
+			bind_usrloc = (bind_usrloc_t)find_export("ul_bind_usrloc", 1, 0);
+			if (!bind_usrloc) {
+  				LM_ERR("can't bind usrloc\n");
+				return -1;
+			}
+			if (bind_usrloc(&ul) < 0) {
+				LM_ERR("Can't bind ul\n");
+                return -1;
+			}			
+			if(ul.register_ulcb != NULL) {
+				if(ul.register_ulcb(ULCB_MAX, ul_cb_contact, 0)< 0)
+				{
+					LM_ERR("can not register callback for expired contacts\n");
+					return -1;
+				}
+			}					
+			if (!usrloc_dmq_initialize()){
+				LM_DBG("usrloc_dmq initialized\n");
+			} else {
+				LM_ERR("Error in usrloc_dmq_initialize()\n");
+			}
+		}
+		return 0;
+}
+
+static int child_init(int rank)
+{
+
+	if (rank == PROC_MAIN) {
+		LM_ERR("child_init PROC_MAIN\n");
+		return 0;
+	}
+	if(rank == PROC_INIT || rank == PROC_TCP_MAIN) {
+		LM_ERR("child_init PROC_INIT\n");
+		return 0;
+	}
+	return 0;
+}

+ 541 - 0
modules/usrloc_dmq/usrloc_sync.c

@@ -0,0 +1,541 @@
+#include "usrloc_sync.h"
+#include "../usrloc/usrloc.h"
+#include "../usrloc/ul_callback.h"
+#include "../usrloc/dlist.h"
+#include "../../dprint.h"
+#include "../../parser/parse_from.h"
+#include "../../parser/parse_addr_spec.h"
+
+static str usrloc_dmq_content_type = str_init("application/json");
+static str dmq_200_rpl  = str_init("OK");
+static str dmq_400_rpl  = str_init("Bad Request");
+static str dmq_500_rpl  = str_init("Server Internal Error");
+
+dmq_api_t usrloc_dmqb;
+dmq_peer_t* usrloc_dmq_peer = NULL;
+dmq_resp_cback_t usrloc_dmq_resp_callback = {&usrloc_dmq_resp_callback_f, 0};
+
+int usrloc_dmq_send_all();
+int usrloc_dmq_request_sync();
+int usrloc_dmq_send_contact(ucontact_t* ptr, str aor, int action, dmq_node_t* node);
+usrloc_api_t ul;
+
+#define MAX_AOR_LEN 256
+int extract_aor(str* _uri, str* _a, sip_uri_t *_pu)
+{
+	static char aor_buf[MAX_AOR_LEN];
+	sip_uri_t turi;
+	sip_uri_t *puri;
+	str *uri;
+	
+	memset(aor_buf, 0, MAX_AOR_LEN);
+	uri=_uri;
+
+	if(_pu!=NULL)
+		puri = _pu;
+	else
+		puri = &turi;
+
+	if (parse_uri(uri->s, uri->len, puri) < 0) {
+		LM_ERR("failed to parse AoR [%.*s]\n", uri->len, uri->s);
+		return -1;
+	}
+	
+	if ( (puri->user.len + puri->host.len + 1) > MAX_AOR_LEN) {
+		LM_ERR("Address Of Record too long\n");
+		return -2;
+	}
+
+	_a->s = aor_buf;
+	_a->len = puri->user.len;
+
+	if (un_escape(&puri->user, _a) < 0) {
+		LM_ERR("failed to unescape username\n");
+		return -3;
+	}
+
+	strlower(_a);
+
+	return 0;
+}
+
+int add_contact(str aor, ucontact_info_t* ci)
+{
+	urecord_t* r;
+	udomain_t* _d;	
+	ucontact_t* c;
+	str contact;
+	int res;
+	
+	ul.get_udomain("location", &_d);
+	ul.lock_udomain(_d, &aor);
+	res = ul.get_urecord(_d, &aor, &r);
+	if (res < 0) {
+		LM_ERR("failed to retrieve record from usrloc\n");
+		goto error;
+	} else if ( res == 0) {
+		LM_DBG("'%.*s' found in usrloc\n", aor.len, ZSW(aor.s));
+		res = ul.get_ucontact_by_instance(r, &aor, ci, &c);
+		LM_DBG("get_ucontact_by_instance = %d\n", res);
+		if (res==-1) {
+			LM_ERR("Invalid cseq\n");
+			goto error;
+		} else if (res > 0 ) {
+			LM_DBG("Not found contact\n");
+			ul.insert_ucontact(r, &contact, ci, &c);
+		} else if (res == 0) {
+			LM_DBG("Found contact\n");
+			ul.update_ucontact(r, c, ci);
+		}
+	} else {
+		LM_DBG("'%.*s' Not found in usrloc\n", aor.len, ZSW(aor.s));
+		ul.insert_urecord(_d, &aor, &r);
+		LM_DBG("Insert record\n");
+		contact.s = ci->c->s;
+		contact.len = ci->c->len;
+		ul.insert_ucontact(r, &contact, ci, &c);
+		LM_DBG("Insert ucontact\n");
+	}	
+	
+		LM_DBG("Release record\n");
+		ul.release_urecord(r);
+		LM_DBG("Unlock udomain\n");
+		ul.unlock_udomain(_d, &aor);
+		return 0;	
+	error:
+		ul.unlock_udomain(_d, &aor);
+		return -1;
+}
+
+void usrloc_get_all_ucontact(dmq_node_t* node)
+{
+ 	int rval, len=0;
+	void *buf, *cp;
+	str c;
+	str path;
+	str ruid;
+	unsigned int aorhash;
+	struct socket_info* send_sock;
+	unsigned int flags;
+	
+	len = 0;
+	buf = NULL;
+
+    if (ul.get_all_ucontacts == NULL){
+        LM_ERR("ul.get_all_ucontacts is NULL\n");
+        goto done;
+    }
+	rval = ul.get_all_ucontacts(buf, len, 0, 0, 1);
+	if (rval<0) {
+		LM_ERR("failed to fetch contacts\n");
+		goto done;
+	}
+	if (rval > 0) {
+		if (buf != NULL)
+			pkg_free(buf);
+		len = rval * 2;
+		buf = pkg_malloc(len);
+		if (buf == NULL) {
+			LM_ERR("out of pkg memory\n");
+			goto done;
+		}
+		rval = ul.get_all_ucontacts(buf, len, 0, 0, 1);
+		if (rval != 0) {
+			pkg_free(buf);
+			goto done;
+		}
+	}
+	if (buf == NULL)
+		goto done;	
+	cp = buf;
+    while (1) {
+        memcpy(&(c.len), cp, sizeof(c.len));
+        if (c.len == 0)
+            break;
+        c.s = (char*)cp + sizeof(c.len);
+        cp =  (char*)cp + sizeof(c.len) + c.len;
+        memcpy( &send_sock, cp, sizeof(send_sock));
+        cp = (char*)cp + sizeof(send_sock);
+        memcpy( &flags, cp, sizeof(flags));
+        cp = (char*)cp + sizeof(flags);
+        memcpy( &(path.len), cp, sizeof(path.len));
+        path.s = path.len ? ((char*)cp + sizeof(path.len)) : NULL ;
+        cp =  (char*)cp + sizeof(path.len) + path.len;
+        memcpy( &(ruid.len), cp, sizeof(ruid.len));
+        ruid.s = ruid.len ? ((char*)cp + sizeof(ruid.len)) : NULL ;
+        cp =  (char*)cp + sizeof(ruid.len) + ruid.len;
+        memcpy( &aorhash, cp, sizeof(aorhash));
+        cp = (char*)cp + sizeof(aorhash);
+
+
+        str aor;
+        sip_uri_t puri;
+        urecord_t* r;
+        udomain_t* _d;
+        ucontact_t* ptr = 0;
+
+        int res;
+
+        if (extract_aor(&c, &aor, &puri) < 0) {
+            LM_ERR("failed to extract address of record\n");
+            continue;
+        }
+        ul.get_udomain("location", &_d);
+
+        res = ul.get_urecord_by_ruid(_d, aorhash, &ruid, &r, &ptr);
+        aor = r->aor;
+        if (res > 0) {
+            LM_DBG("'%.*s' Not found in usrloc\n", aor.len, ZSW(aor.s));
+            ul.release_urecord(r);
+            ul.unlock_udomain(_d, &aor);
+            continue;
+        }
+        LM_DBG("- AoR: %.*s  AoRhash=%d  Flags=%d\n", aor.len, aor.s, aorhash, flags);
+
+        while (ptr) {
+            usrloc_dmq_send_contact(ptr, aor, DMQ_UPDATE, node);
+            ptr = ptr->next;
+        }
+        ul.release_urecord(r);
+        ul.unlock_udomain(_d, &aor);
+    }
+	pkg_free(buf);
+	
+done:
+	c.s = ""; c.len = 0;
+}
+
+
+int usrloc_dmq_initialize()
+{
+	dmq_peer_t not_peer;
+	
+	/* load the DMQ API */
+	if (dmq_load_api(&usrloc_dmqb)!=0) {
+		LM_ERR("cannot load dmq api\n");
+		return -1;
+	} else {
+		LM_DBG("loaded dmq api\n");
+	}
+	not_peer.callback = usrloc_dmq_handle_msg;
+	not_peer.init_callback = usrloc_dmq_request_sync;
+	not_peer.description.s = "usrloc";
+	not_peer.description.len = 6;
+	not_peer.peer_id.s = "usrloc";
+	not_peer.peer_id.len = 6;
+	usrloc_dmq_peer = usrloc_dmqb.register_dmq_peer(&not_peer);
+	if(!usrloc_dmq_peer) {
+		LM_ERR("error in register_dmq_peer\n");
+		goto error;
+	} else {
+		LM_DBG("dmq peer registered\n");
+	}
+	return 0;
+error:
+	return -1;
+}
+
+
+int usrloc_dmq_send(str* body, dmq_node_t* node) {
+	if (!usrloc_dmq_peer) {
+		LM_ERR("dlg_dmq_peer is null!\n");
+		return -1;
+	}
+	if (node) {
+		LM_DBG("sending dmq message ...\n");
+		usrloc_dmqb.send_message(usrloc_dmq_peer, body, node, &usrloc_dmq_resp_callback, 1, &usrloc_dmq_content_type);
+	} else {
+		LM_DBG("sending dmq broadcast...\n");
+		usrloc_dmqb.bcast_message(usrloc_dmq_peer, body, 0, &usrloc_dmq_resp_callback, 1, &usrloc_dmq_content_type);
+	}
+	return 0;
+}
+
+
+/**
+* @brief ht dmq callback
+*/
+int usrloc_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* node)
+{
+	int content_length;
+	str body;
+	srjson_doc_t jdoc;
+	srjson_t *it = NULL;
+	static ucontact_info_t ci;
+
+	int action, expires, cseq, flags, cflags, q, last_modified, methods, reg_id;
+	str aor, ruid, c, received, path, callid, user_agent, instance;
+
+	parse_from_header(msg);
+	body = ((struct to_body*)msg->from->parsed)->uri;
+	
+	LM_DBG("dmq message received from %.*s\n", body.len, body.s);
+
+	if(!msg->content_length) {
+		LM_ERR("no content length header found\n");
+		goto invalid;
+	}
+	content_length = get_content_length(msg);
+	if(!content_length) {
+		LM_DBG("content length is 0\n");
+		goto invalid;
+	}
+
+	body.s = get_body(msg);
+	body.len = content_length;
+
+	if (!body.s) {
+		LM_ERR("unable to get body\n");
+		goto error;
+	}
+
+	srjson_InitDoc(&jdoc, NULL);
+	jdoc.buf = body;
+	if(jdoc.root == NULL) {
+		jdoc.root = srjson_Parse(&jdoc, jdoc.buf.s);
+		if(jdoc.root == NULL)
+		{
+			LM_ERR("invalid json doc [[%s]]\n", jdoc.buf.s);
+			goto invalid;
+		}
+	}
+	
+	for(it=jdoc.root->child; it; it = it->next)
+	{
+		if (it->string == NULL) continue;
+		
+		if (strcmp(it->string, "action")==0) {
+			action = it->valueint;
+		} else if (strcmp(it->string, "aor")==0) {
+			aor.s = it->valuestring;
+			aor.len = strlen(aor.s);
+		} else if (strcmp(it->string, "ruid")==0) {
+			ruid.s = it->valuestring;
+			ruid.len = strlen(ruid.s);
+		} else if (strcmp(it->string, "c")==0) {
+			c.s = it->valuestring;
+			c.len = strlen(c.s);
+		} else if (strcmp(it->string, "received")==0) {
+			received.s = it->valuestring;
+			received.len = strlen(received.s);
+		} else if (strcmp(it->string, "path")==0) {
+			path.s = it->valuestring;
+			path.len = strlen(path.s);
+		} else if (strcmp(it->string, "callid")==0) {
+			callid.s = it->valuestring;
+			callid.len = strlen(callid.s);
+		} else if (strcmp(it->string, "user_agent")==0) {
+			user_agent.s = it->valuestring;
+			user_agent.len = strlen(user_agent.s);
+		} else if (strcmp(it->string, "instance")==0) {
+			instance.s = it->valuestring;
+			instance.len = strlen(instance.s);
+		} else if (strcmp(it->string, "expires")==0) { //
+			expires = it->valueint;
+		} else if (strcmp(it->string, "cseq")==0) {
+			cseq = it->valueint;
+		} else if (strcmp(it->string, "flags")==0) {
+			flags = it->valueint;
+		} else if (strcmp(it->string, "cflags")==0) {
+			cflags = it->valueint;
+		} else if (strcmp(it->string, "q")==0) {
+			q = it->valueint;
+		} else if (strcmp(it->string, "last_modified")==0) {
+			last_modified = it->valueint;
+		} else if (strcmp(it->string, "methods")==0) {
+			methods = it->valueint;
+		} else if (strcmp(it->string, "reg_id")==0) {
+			reg_id = it->valueint;
+		} else {
+			LM_ERR("unrecognized field in json object\n");
+		}		
+	} 
+	srjson_DestroyDoc(&jdoc);
+	memset( &ci, 0, sizeof(ucontact_info_t));
+	ci.ruid = ruid;
+	ci.c = &c;
+	ci.received = received;
+	ci.path = &path;
+	ci.expires = expires;
+	ci.q = q;
+	ci.callid = &callid;
+	ci.cseq = cseq;
+	ci.flags = flags;
+	ci.flags |= usrloc_dmq_flag;
+	ci.cflags = cflags;
+	ci.user_agent = &user_agent;
+	ci.methods = methods;
+	ci.instance = instance;
+	ci.reg_id = reg_id;
+	ci.tcpconn_id = -1;
+	ci.last_modified = last_modified;
+
+	switch(action) {
+		case DMQ_UPDATE:
+						LM_DBG("Received DMQ_UPDATE. Update contact info...\n");
+						add_contact(aor, &ci);
+						break;
+		case DMQ_RM:
+						LM_DBG("Received DMQ_RM. Delete contact info...\n");
+						break;
+		case DMQ_SYNC:
+						LM_DBG("Received DMQ_SYNC. Sending all contacts...\n");
+						usrloc_get_all_ucontact(node);
+						break;
+		case DMQ_NONE:
+						LM_DBG("Received DMQ_NONE. Not used...\n");
+						break;
+						
+		default:  goto invalid;
+	}
+	
+	resp->reason = dmq_200_rpl;
+	resp->resp_code = 200;
+	return 0;
+
+invalid:
+	resp->reason = dmq_400_rpl;
+	resp->resp_code = 400;
+	return 0;
+
+error:
+	resp->reason = dmq_500_rpl;
+	resp->resp_code = 500;
+	return 0;
+}
+
+
+int usrloc_dmq_request_sync() {
+	srjson_doc_t jdoc;
+	LM_DBG("requesting sync from dmq peers\n");
+	srjson_InitDoc(&jdoc, NULL);
+
+	jdoc.root = srjson_CreateObject(&jdoc);
+	if(jdoc.root==NULL) {
+		LM_ERR("cannot create json root\n");
+		goto error;
+	}
+
+	srjson_AddNumberToObject(&jdoc, jdoc.root, "action", DMQ_SYNC);
+	jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root);
+	if(jdoc.buf.s==NULL) {
+		LM_ERR("unable to serialize data\n");
+		goto error;
+	}
+	jdoc.buf.len = strlen(jdoc.buf.s);
+	LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s);
+	if (usrloc_dmq_send(&jdoc.buf, 0)!=0) {
+		goto error;
+	}
+
+	jdoc.free_fn(jdoc.buf.s);
+	jdoc.buf.s = NULL;
+	srjson_DestroyDoc(&jdoc);
+	return 0;
+
+error:
+	if(jdoc.buf.s!=NULL) {
+		jdoc.free_fn(jdoc.buf.s);
+		jdoc.buf.s = NULL;
+	}
+	srjson_DestroyDoc(&jdoc);
+	return -1;
+}
+
+int usrloc_dmq_send_contact(ucontact_t* ptr, str aor, int action, dmq_node_t* node) {
+	srjson_doc_t jdoc;
+	srjson_InitDoc(&jdoc, NULL);
+	
+	int flags;
+
+	jdoc.root = srjson_CreateObject(&jdoc);
+	if(jdoc.root==NULL) {
+		LM_ERR("cannot create json root\n");
+		goto error;
+	}
+
+	flags = ptr->flags;
+	flags &= ~usrloc_dmq_flag;
+
+	srjson_AddNumberToObject(&jdoc, jdoc.root, "action", action);
+	
+	srjson_AddStrToObject(&jdoc, jdoc.root, "aor", aor.s, aor.len);
+	srjson_AddStrToObject(&jdoc, jdoc.root, "ruid", ptr->ruid.s, ptr->ruid.len);
+	srjson_AddStrToObject(&jdoc, jdoc.root, "c", ptr->c.s, ptr->c.len);
+	srjson_AddStrToObject(&jdoc, jdoc.root, "received", ptr->received.s, ptr->received.len);
+	srjson_AddStrToObject(&jdoc, jdoc.root, "path", ptr->path.s, ptr->path.len);
+	srjson_AddStrToObject(&jdoc, jdoc.root, "callid", ptr->callid.s, ptr->callid.len);
+	srjson_AddStrToObject(&jdoc, jdoc.root, "user_agent", ptr->user_agent.s, ptr->user_agent.len);
+	srjson_AddStrToObject(&jdoc, jdoc.root, "instance", ptr->instance.s, ptr->instance.len);
+	srjson_AddNumberToObject(&jdoc, jdoc.root, "expires", ptr->expires);
+	srjson_AddNumberToObject(&jdoc, jdoc.root, "cseq", ptr->cseq);
+	srjson_AddNumberToObject(&jdoc, jdoc.root, "flags", flags);
+	srjson_AddNumberToObject(&jdoc, jdoc.root, "cflags", ptr->cflags);
+	srjson_AddNumberToObject(&jdoc, jdoc.root, "q", ptr->q);
+	srjson_AddNumberToObject(&jdoc, jdoc.root, "last_modified", ptr->last_modified);
+	srjson_AddNumberToObject(&jdoc, jdoc.root, "methods", ptr->methods);
+	srjson_AddNumberToObject(&jdoc, jdoc.root, "reg_id", ptr->reg_id);
+
+	jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root);
+	if(jdoc.buf.s==NULL) {
+		LM_ERR("unable to serialize data\n");
+		goto error;
+	}
+	jdoc.buf.len = strlen(jdoc.buf.s);
+	
+	LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s);
+	if (usrloc_dmq_send(&jdoc.buf, node)!=0) {
+		goto error;
+	}
+
+	jdoc.free_fn(jdoc.buf.s);
+	jdoc.buf.s = NULL;
+	srjson_DestroyDoc(&jdoc);
+	return 0;
+
+error:
+	if(jdoc.buf.s!=NULL) {
+		jdoc.free_fn(jdoc.buf.s);
+		jdoc.buf.s = NULL;
+	}
+	srjson_DestroyDoc(&jdoc);
+	return -1;
+}
+
+int usrloc_dmq_resp_callback_f(struct sip_msg* msg, int code,
+                            dmq_node_t* node, void* param)
+{
+	LM_DBG("dmq response callback triggered [%p %d %p]\n", msg, code, param);
+	return 0;
+}
+
+void ul_cb_contact(ucontact_t* ptr, int type, void* param)
+{
+	str aor;
+
+		LM_DBG("Callback from usrloc with type=%d\n", type);
+		aor.s = ptr->aor->s;
+		aor.len = ptr->aor->len;
+		
+		if (!(ptr->flags & usrloc_dmq_flag)) {
+		
+			switch(type){
+				case UL_CONTACT_INSERT:
+											usrloc_dmq_send_contact(ptr, aor, DMQ_UPDATE, 0);
+										break;
+				case UL_CONTACT_UPDATE:
+											usrloc_dmq_send_contact(ptr, aor, DMQ_UPDATE, 0);
+										break;
+				case UL_CONTACT_DELETE:
+											//usrloc_dmq_send_contact(ptr, aor, DMQ_RM);
+											LM_DBG("Contact <%.*s> deleted\n", aor.len, aor.s);
+										break;
+				case UL_CONTACT_EXPIRE:
+											//usrloc_dmq_send_contact(ptr, aor, DMQ_UPDATE);
+											LM_DBG("Contact <%.*s> expired\n", aor.len, aor.s);
+										break;
+			}
+		} else {
+			LM_DBG("Contact recieved from DMQ... skip\n");
+		}
+}

+ 32 - 0
modules/usrloc_dmq/usrloc_sync.h

@@ -0,0 +1,32 @@
+#ifndef _DMQ_SYNC_USRLOC_H_
+#define _DMQ_SYNC_USRLOC_H_
+
+#include "../dmq/bind_dmq.h"
+#include "../../lib/srutils/srjson.h"
+#include "../../parser/msg_parser.h"
+#include "../../parser/parse_content.h"
+#include "../usrloc/usrloc.h"
+
+int usrloc_dmq_flag;
+
+extern dmq_api_t usrloc_dmqb;
+extern dmq_peer_t* usrloc_dmq_peer;
+extern dmq_resp_cback_t usrloc_dmq_resp_callback;
+extern rpc_export_t ul_rpc[];
+
+usrloc_api_t ul;
+
+typedef enum {
+	DMQ_NONE,
+	DMQ_UPDATE,
+	DMQ_RM,
+	DMQ_SYNC,
+} usrloc_dmq_action_t;
+
+int usrloc_dmq_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void* param);
+int usrloc_dmq_initialize();
+int usrloc_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* node);
+int usrloc_dmq_request_sync();
+void ul_cb_contact(ucontact_t* c, int type, void* param);
+
+#endif