فهرست منبع

mqueue: new module offering memory queues

- for usage in cfg file
- many queues can be defined
- one item in queue is a pair (key, value), both string
- functions to add and consume items in the queue
- fetching the oldest item makes it available to pseudo-variables
- example of usage: send tasks from SIP workers to a timer process
  (e.g., defined by rtimer module) that consumes them. Can be some time
  expensive operation, like email notification, etc.
Elena-Ramona Modroiu 15 سال پیش
والد
کامیت
ceb69ce094

+ 12 - 0
modules/mqueue/Makefile

@@ -0,0 +1,12 @@
+# $Id$
+#
+# WARNING: do not run this directly, it should be run by the master Makefile
+
+include ../../Makefile.defs
+auto_gen=
+NAME=mqueue.so
+LIBS=
+
+DEFS+=-DOPENSER_MOD_INTERFACE
+
+include ../../Makefile.modules

+ 123 - 0
modules/mqueue/README

@@ -0,0 +1,123 @@
+mqueue Module
+
+Elena-Ramona Modroiu
+
+   asipto.com
+
+Edited by
+
+Elena-Ramona Modroiu
+
+   <[email protected]>
+
+   Copyright © 2010 Elena-Ramona Modroiu (asipto.com)
+     __________________________________________________________________
+
+   Table of Contents
+
+   1. Admin Guide
+
+        1.1. Overview
+        1.2. Dependencies
+
+              1.2.1. Kamailio Modules
+              1.2.2. External Libraries or Applications
+
+        1.3. Exported Parameters
+
+              1.3.1. mqueue (string)
+
+        1.4. Exported Functions
+
+              1.4.1. mq_add(queue, key, value)
+              1.4.2. mq_fetch(queue)
+              1.4.3. mq_pv(queue)
+
+   List of Examples
+
+   1.1. Set mqueue parameter
+   1.2. mq_add usage
+   1.3. mq_fetch usage
+   1.4. mq_pv_free usage
+
+Chapter 1. Admin Guide
+
+1.1. Overview
+
+   This module offers generic message queue system in shared memory for
+   inter-process communication via config file. One example of usage is to
+   send time consuming operations to a timer process that consumes items
+   in the queue, without affecting SIP message handling.
+
+   There can be defined many queues, access to values being done via
+   pseudo variables.
+
+1.2. Dependencies
+
+1.2.1. Kamailio Modules
+
+   The following modules must be loaded before this module:
+     * None.
+
+1.2.2. External Libraries or Applications
+
+   The following libraries or applications must be installed before
+   running Kamailio with this module loaded:
+     * None.
+
+1.3. Exported Parameters
+
+1.3.1. mqueue (string)
+
+   Definition of memory queue
+
+   Default value is "none".
+
+   Value must be a list of parameters: attr=value;... The attribute 'name'
+   is mandatory, defining the name of the queue. Optional attribute 'size'
+   specifies the maximum number of items in queue, if it is execeeded the
+   oldest one is removed.
+
+   The parameter can be set many time, each holding the definition of one
+   queue.
+
+   Example 1.1. Set mqueue parameter
+...
+modparam("mqueue", "mqueue", "name=myq;size=20;")
+modparam("mqueue", "mqueue", "name=qaz")
+...
+
+1.4. Exported Functions
+
+1.4.1.  mq_add(queue, key, value)
+
+   Add a new item (key, value) in the queue. If max size of queue is
+   exceeded, the oldest one is removed.
+
+   Example 1.2. mq_add usage
+...
+mq_add("myq", "$rU", "call from $fU");
+...
+
+1.4.2.  mq_fetch(queue)
+
+   Take oldest item from que and fill $mqk(queue) and $mqv(queue) pseudo
+   variables. Return true on success.
+
+   Example 1.3. mq_fetch usage
+...
+while(mq_add("myq"))
+{
+   xlog("$mqk(myq) - $mqv(myq)\n");
+}
+...
+
+1.4.3.  mq_pv(queue)
+
+   Free the item fetched in pseudo-variables. It is optional, a new fetch
+   frees the old values.
+
+   Example 1.4. mq_pv_free usage
+...
+mq_pv_free("myq");
+...

+ 4 - 0
modules/mqueue/doc/Makefile

@@ -0,0 +1,4 @@
+docs = mqueue.xml
+
+docbook_dir = ../../../docbook
+include $(docbook_dir)/Makefile.module

+ 36 - 0
modules/mqueue/doc/mqueue.xml

@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding='ISO-8859-1'?>
+<!DOCTYPE book PUBLIC "-//OASIS//DTD DocBook XML V4.4//EN"
+"http://www.oasis-open.org/docbook/xml/4.4/docbookx.dtd" [
+
+<!-- Include general documentation entities -->
+<!ENTITY % docentities SYSTEM "../../../docbook/entities.xml">
+%docentities;
+
+]>
+
+<book xmlns:xi="http://www.w3.org/2001/XInclude">
+    <bookinfo>
+	<title>mqueue Module</title>
+	<productname class="trade">&kamailioname;</productname>
+	<authorgroup>
+	    <author>
+		<firstname>Elena-Ramona</firstname>
+		<surname>Modroiu</surname>
+		<affiliation><orgname>asipto.com</orgname></affiliation>
+	    </author>
+	    <editor>
+		<firstname>Elena-Ramona</firstname>
+		<surname>Modroiu</surname>
+		    <email>[email protected]</email>
+	    </editor>
+	</authorgroup>
+	<copyright>
+	    <year>2010</year>
+	    <holder>Elena-Ramona Modroiu (asipto.com)</holder>
+	</copyright>
+    </bookinfo>
+    <toc></toc>
+    
+	<xi:include href="mqueue_admin.xml"/>
+    
+</book>

+ 158 - 0
modules/mqueue/doc/mqueue_admin.xml

@@ -0,0 +1,158 @@
+<?xml version="1.0" encoding='ISO-8859-1'?>
+<!DOCTYPE book PUBLIC "-//OASIS//DTD DocBook XML V4.4//EN"
+"http://www.oasis-open.org/docbook/xml/4.4/docbookx.dtd" [
+
+<!-- Include general documentation entities -->
+<!ENTITY % docentities SYSTEM "../../../docbook/entities.xml">
+%docentities;
+
+]>
+
+<!-- Module User's Guide -->
+
+<chapter>
+    
+    <title>&adminguide;</title>
+    
+    <section>
+	<title>Overview</title>
+	<para>
+		This module offers generic message queue system in shared memory for
+		inter-process communication via config file. One example of usage is
+		to send time consuming operations to a timer process that consumes
+		items in the queue, without affecting SIP message handling.
+	</para>
+	<para>
+		There can be defined many queues, access to values being done via
+		pseudo variables.
+	</para>
+    </section>
+    <section>
+	<title>Dependencies</title>
+	<section>
+	    <title>&kamailio; Modules</title>
+	    <para>
+		The following modules must be loaded before this module:
+	    	<itemizedlist>
+		    <listitem>
+			<para>
+			    <emphasis>None</emphasis>.
+			</para>
+		    </listitem>
+	    	</itemizedlist>
+	    </para>
+	</section>
+	<section>
+	    <title>External Libraries or Applications</title>
+	    <para>
+		The following libraries or applications must be installed before running
+		&kamailio; with this module loaded:
+	    	<itemizedlist>
+		    <listitem>
+			<para>
+			    <emphasis>None</emphasis>.
+			</para>
+		    </listitem>
+	    	</itemizedlist>
+	    </para>
+	</section>
+    </section>
+    <section>
+	<title>Exported Parameters</title>
+
+	<section>
+	    <title><varname>mqueue</varname> (string)</title>
+	    <para>
+		Definition of memory queue
+	    </para>
+	    <para>
+		<emphasis>
+		    Default value is <quote>none</quote>.
+		</emphasis>
+	    </para>
+		<para>
+		Value must be a list of parameters: attr=value;... The attribute 'name'
+		is mandatory, defining the name of the queue. Optional attribute 'size'
+		specifies the maximum number of items in queue, if it is execeeded the
+		oldest one is removed.
+	    </para>
+		<para>
+		The parameter can be set many time, each holding the definition of one
+		queue.
+	    </para>
+	    <example>
+		<title>Set <varname>mqueue</varname> parameter</title>
+		<programlisting format="linespecific">
+...
+modparam("mqueue", "mqueue", "name=myq;size=20;")
+modparam("mqueue", "mqueue", "name=qaz")
+...
+</programlisting>
+	    </example>
+	</section>
+
+	</section>
+	
+    <section>
+	<title>Exported Functions</title>
+ 	<section>
+	    <title>
+		<function moreinfo="none">mq_add(queue, key, value)</function>
+	    </title>
+	    <para>
+		Add a new item (key, value) in the queue. If max size of queue is
+		exceeded, the oldest one is removed.
+	    </para>
+		<example>
+		<title><function>mq_add</function> usage</title>
+		<programlisting format="linespecific">
+...
+mq_add("myq", "$rU", "call from $fU");
+...
+</programlisting>
+	    </example>
+	</section>
+	
+ 	<section>
+	    <title>
+		<function moreinfo="none">mq_fetch(queue)</function>
+	    </title>
+	    <para>
+		Take oldest item from que and fill $mqk(queue) and $mqv(queue) pseudo
+		variables. Return true on success.
+	    </para>
+		<example>
+		<title><function>mq_fetch</function> usage</title>
+		<programlisting format="linespecific">
+...
+while(mq_add("myq"))
+{
+   xlog("$mqk(myq) - $mqv(myq)\n");
+}
+...
+</programlisting>
+	    </example>
+	</section>
+	
+ 	<section>
+	    <title>
+		<function moreinfo="none">mq_pv(queue)</function>
+	    </title>
+	    <para>
+		Free the item fetched in pseudo-variables. It is optional, a new fetch
+		frees the old values.
+	    </para>
+		<example>
+		<title><function>mq_pv_free</function> usage</title>
+		<programlisting format="linespecific">
+...
+mq_pv_free("myq");
+...
+</programlisting>
+	    </example>
+	</section>
+	
+    </section>
+	
+</chapter>
+

+ 408 - 0
modules/mqueue/mqueue_api.c

@@ -0,0 +1,408 @@
+/**
+ * $Id$
+ *
+ * Copyright (C) 2010 Elena-Ramona Modroiu (asipto.com)
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * This file is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * This file is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+
+#include <stdio.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "../../dprint.h"
+#include "../../mem/mem.h"
+#include "../../mem/shm_mem.h"
+#include "../../parser/parse_param.h"
+#include "../../ut.h"
+#include "../../shm_init.h"
+
+#include "mqueue_api.h"
+
+/**
+ *
+ */
+typedef struct _mq_item
+{
+	str key;
+	str val;
+	struct _mq_item *prev;
+	struct _mq_item *next;
+} mq_item_t;
+
+/**
+ *
+ */
+typedef struct _mq_head
+{
+	str name;
+	int msize;
+	int csize;
+	gen_lock_t lock;
+	mq_item_t *ifirst;
+	mq_item_t *ilast;
+	struct _mq_head *next;
+} mq_head_t;
+
+/**
+ *
+ */
+typedef struct _mq_pv
+{
+	str *name;
+	mq_item_t *item;
+	struct _mq_pv *next;
+} mq_pv_t;
+
+/**
+ *
+ */
+static mq_head_t *_mq_head_list = NULL;
+
+/**
+ *
+ */
+static mq_pv_t *_mq_pv_list = NULL;
+
+/**
+ *
+ */
+int mq_head_defined(void)
+{
+	if(_mq_head_list!=NULL)
+		return 1;
+	return 0;
+}
+
+/**
+ *
+ */
+void mq_destroy(void)
+{
+	mq_head_t *mh = NULL;
+	mq_pv_t *mp = NULL;
+	mq_item_t *mi = NULL;
+	mq_head_t *mh1 = NULL;
+	mq_pv_t *mp1 = NULL;
+	mq_item_t *mi1 = NULL;
+	
+	mh = _mq_head_list;
+	while(mh!=NULL)
+	{
+		mi = mh->ifirst;
+		while(mi!=NULL)
+		{
+			mi1 = mi;
+			mi = mi->next;
+			shm_free(mi1);
+		}
+		mh1 = mh;
+		mh = mh->next;
+		lock_destroy(&mh1->lock);
+		shm_free(mh1);
+	}
+	_mq_head_list = 0;
+	mp = _mq_pv_list;
+	while(mp!=NULL)
+	{
+		mp1 = mp;
+		mp = mp->next;
+		pkg_free(mp1);
+	}
+}
+
+/**
+ *
+ */
+int mq_head_add(str *name, int msize)
+{
+	mq_head_t *mh = NULL;
+	mq_pv_t *mp = NULL;
+	int len;
+
+	if(!shm_initialized())
+	{
+		LM_ERR("shm not intialized - cannot define mqueue now\n");
+		return 0;
+	}
+
+	mh = _mq_head_list;
+	while(mh!=NULL)
+	{
+		if(name->len == mh->name.len
+				&& strncmp(mh->name.s, name->s, name->len)==0)
+		{
+			LM_ERR("mqueue redefined: %.*s\n", name->len, name->s);
+			return -1;
+		}
+		mh = mh->next;
+	}
+
+	mp = (mq_pv_t*)pkg_malloc(sizeof(mq_pv_t));
+	if(mp==NULL)
+	{
+		LM_ERR("no more pkg for: %.*s\n", name->len, name->s);
+		return -1;
+	}
+	memset(mp, 0, sizeof(mq_pv_t));
+
+	len = sizeof(mq_head_t) + name->len + 1;
+	mh = (mq_head_t*)shm_malloc(len);
+	if(mh==NULL)
+	{
+		LM_ERR("no more shm for: %.*s\n", name->len, name->s);
+		pkg_free(mp);
+		return -1;
+	}
+	memset(mh, 0, len);
+	if (lock_init(&mh->lock)==0 )
+	{
+		LM_CRIT("failed to init lock\n");
+		pkg_free(mp);
+		shm_free(mh);
+		return -1;
+	}
+
+	mh->name.s = (char*)mh + sizeof(mq_head_t);
+	memcpy(mh->name.s, name->s, name->len);
+	mh->name.len = name->len;
+	mh->name.s[name->len] = '\0';
+	mh->msize = msize;
+	mh->next = _mq_head_list;
+	_mq_head_list = mh;
+
+	mp->name = &mh->name;
+	mp->next = _mq_pv_list;
+	_mq_pv_list = mp;
+
+	return 0;
+}
+
+/**
+ *
+ */
+mq_head_t *mq_head_get(str *name)
+{
+	mq_head_t *mh = NULL;
+
+	mh = _mq_head_list;
+	while(mh!=NULL)
+	{
+		if(name->len == mh->name.len
+				&& strncmp(mh->name.s, name->s, name->len)==0)
+		{
+			return mh;
+		}
+		mh = mh->next;
+	}
+	return NULL;
+}
+
+/**
+ *
+ */
+mq_pv_t *mq_pv_get(str *name)
+{
+	mq_pv_t *mp = NULL;
+
+	mp = _mq_pv_list;
+	while(mp!=NULL)
+	{
+		if(mp->name->len==name->len
+				&& strncmp(mp->name->s, name->s, name->len)==0)
+			return mp;
+		mp = mp->next;
+	}
+	return NULL;
+}
+
+/**
+ *
+ */
+int mq_head_fetch(str *name)
+{
+	mq_head_t *mh = NULL;
+	mq_pv_t *mp = NULL;
+
+	mp = mq_pv_get(name);
+	if(mp==NULL)
+		return -1;
+	if(mp->item!=NULL)
+	{
+		shm_free(mp->item);
+		mp->item = NULL;
+	}
+	mh = mq_head_get(name);
+	if(mh==NULL)
+		return -1;
+	lock_get(&mh->lock);
+
+	if(mh->ifirst!=NULL)
+	{
+		mp->item = mh->ifirst;
+		mh->ifirst = mh->ifirst->next;
+		if(mh->ifirst==NULL) {
+			mh->ilast = NULL;
+		} else {
+			mh->ifirst->prev = NULL;
+		}
+		mh->csize--;
+	}
+
+	lock_release(&mh->lock);
+	return 0;
+}
+
+/**
+ *
+ */
+void mq_pv_free(str *name)
+{
+	mq_pv_t *mp = NULL;
+
+	mp = mq_pv_get(name);
+	if(mp==NULL)
+		return;
+	if(mp->item!=NULL)
+	{
+		shm_free(mp->item);
+		mp->item = NULL;
+	}
+}
+
+/**
+ *
+ */
+int mq_item_add(str *qname, str *key, str *val)
+{
+	mq_head_t *mh = NULL;
+	mq_item_t *mi = NULL;
+	int len;
+
+	mh = mq_head_get(qname);
+	if(mh==NULL)
+	{
+		LM_ERR("mqueue not found: %.*s\n", qname->len, qname->s);
+		return -1;
+	}
+	len = sizeof(mq_item_t) + key->len + val->len + 2;
+	mi = (mq_item_t*)shm_malloc(len);
+	if(mi==NULL)
+	{
+		LM_ERR("no more shm to add to: %.*s\n", qname->len, qname->s);
+		return -1;
+	}
+	memset(mi, 0, len);
+	mi->key.s = (char*)mi + sizeof(mq_item_t);
+	memcpy(mi->key.s, key->s, key->len);
+	mi->key.len = key->len;
+	mi->key.s[key->len] = '\0';
+	
+	mi->val.s = mi->key.s + mi->key.len + 1;
+	memcpy(mi->val.s, val->s, val->len);
+	mi->val.len = val->len;
+	mi->val.s[val->len] = '\0';
+	
+	lock_get(&mh->lock);
+	if(mh->ifirst==NULL)
+	{
+		mh->ifirst = mi;
+		mh->ilast = mi;
+	} else {
+		mh->ilast->next = mi;
+		mi->prev = mh->ilast;
+		mh->ilast = mi;
+	}
+	mh->csize++;
+	if(mh->msize>0 && mh->csize>mh->msize)
+	{
+		mi = mh->ifirst;
+		mh->ifirst = mh->ifirst->next;
+		if(mh->ifirst==NULL)
+			mh->ilast = NULL;
+		else
+			mh->ifirst->prev = NULL;
+	}
+	lock_release(&mh->lock);
+	return 0;
+}
+
+/**
+ *
+ */
+int pv_parse_mqk_name(pv_spec_t *sp, str *in)
+{
+	mq_head_t *mh = NULL;
+	mh = mq_head_get(in);
+	if(mh==NULL)
+	{
+		LM_ERR("mqueue not found: %.*s\n", in->len, in->s);
+		return -1;
+	}
+	sp->pvp.pvn.u.isname.name.s = *in;
+	sp->pvp.pvn.type = PV_NAME_INTSTR;
+	sp->pvp.pvn.u.isname.type = 1;
+	return 0;
+}
+
+/**
+ *
+ */
+int pv_get_mqk(struct sip_msg *msg, pv_param_t *param,
+		pv_value_t *res)
+{
+	mq_pv_t *mp = NULL;
+	mp = mq_pv_get(&param->pvn.u.isname.name.s);
+	if(mp==NULL || mp->item==NULL || mp->item->key.len<=0)
+		return pv_get_null(msg, param, res);
+	return pv_get_strval(msg, param, res, &mp->item->key);
+}
+
+/**
+ *
+ */
+int pv_parse_mqv_name(pv_spec_t *sp, str *in)
+{
+	mq_head_t *mh = NULL;
+	mh = mq_head_get(in);
+	if(mh==NULL)
+	{
+		LM_ERR("mqueue not found: %.*s\n", in->len, in->s);
+		return -1;
+	}
+	sp->pvp.pvn.u.isname.name.s = *in;
+	sp->pvp.pvn.type = PV_NAME_INTSTR;
+	sp->pvp.pvn.u.isname.type = 1;
+	return 0;
+}
+
+/**
+ *
+ */
+int pv_get_mqv(struct sip_msg *msg, pv_param_t *param,
+		pv_value_t *res)
+{
+	mq_pv_t *mp = NULL;
+	mp = mq_pv_get(&param->pvn.u.isname.name.s);
+	if(mp==NULL || mp->item==NULL || mp->item->val.len<=0)
+		return pv_get_null(msg, param, res);
+	return pv_get_strval(msg, param, res, &mp->item->val);
+}
+

+ 46 - 0
modules/mqueue/mqueue_api.h

@@ -0,0 +1,46 @@
+/**
+ * $Id$
+ *
+ * Copyright (C) 2010 Elena-Ramona Modroiu (asipto.com)
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * This file is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * This file is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+		       
+#ifndef _MQUEUE_API_H_
+#define _MQUEUE_API_H_
+
+#include "../../pvar.h"
+#include "../../parser/msg_parser.h"
+
+int pv_parse_mqk_name(pv_spec_p sp, str *in);
+int pv_get_mqk(struct sip_msg *msg, pv_param_t *param,
+		pv_value_t *res);
+int pv_parse_mqv_name(pv_spec_p sp, str *in);
+int pv_get_mqv(struct sip_msg *msg, pv_param_t *param,
+		pv_value_t *res);
+
+int mq_head_defined(void);
+void mq_destroy(void);
+int mq_head_add(str *name, int msize);
+int mq_head_fetch(str *name);
+void mq_pv_free(str *name);
+int mq_item_add(str *qname, str *key, str *val);
+
+#endif
+

+ 208 - 0
modules/mqueue/mqueue_mod.c

@@ -0,0 +1,208 @@
+/**
+ * $Id$
+ *
+ * Copyright (C) 2010 Elena-Ramona Modroiu (asipto.com)
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * This file is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ *
+ * This file is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+#include <stdio.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "../../sr_module.h"
+#include "../../dprint.h"
+#include "../../ut.h"
+#include "../../pvar.h"
+#include "../../mod_fix.h"
+#include "../../parser/parse_param.h"
+#include "../../shm_init.h"
+
+#include "mqueue_api.h"
+
+MODULE_VERSION
+
+static int  mod_init(void);
+static void mod_destroy(void);
+
+static int w_mq_fetch(struct sip_msg* msg, char* mq, char* str2);
+static int w_mq_add(struct sip_msg* msg, char* mq, char* key, char* val);
+static int w_mq_pv_free(struct sip_msg* msg, char* mq, char* str2);
+int mq_param(modparam_t type, void *val);
+static int fixup_mq_add(void** param, int param_no);
+
+static pv_export_t mod_pvs[] = {
+	{ {"mqk", sizeof("mqk")-1}, PVT_OTHER, pv_get_mqk, 0,
+		pv_parse_mqk_name, 0, 0, 0 },
+	{ {"mqv", sizeof("mqv")-1}, PVT_OTHER, pv_get_mqv, 0,
+		pv_parse_mqv_name, 0, 0, 0 },
+	{ {0, 0}, 0, 0, 0, 0, 0, 0, 0 }
+};
+
+
+static cmd_export_t cmds[]={
+	{"mq_fetch", (cmd_function)w_mq_fetch, 1, fixup_str_null,
+		0, ANY_ROUTE},
+	{"mq_add", (cmd_function)w_mq_add, 3, fixup_mq_add,
+		0, ANY_ROUTE},
+	{"mq_pv_free", (cmd_function)w_mq_pv_free, 1, fixup_str_null,
+		0, ANY_ROUTE},
+	{0, 0, 0, 0, 0, 0}
+};
+
+static param_export_t params[]={
+	{"mqueue",          STR_PARAM|USE_FUNC_PARAM, (void*)mq_param},
+	{0, 0, 0}
+};
+
+struct module_exports exports = {
+	"mqueue",
+	DEFAULT_DLFLAGS, /* dlopen flags */
+	cmds,
+	params,
+	0,
+	0,              /* exported MI functions */
+	mod_pvs,        /* exported pseudo-variables */
+	0,              /* extra processes */
+	mod_init,       /* module initialization function */
+	0,              /* response function */
+	mod_destroy,    /* destroy function */
+	0               /* per child init function */
+};
+
+
+
+/**
+ * init module function
+ */
+static int mod_init(void)
+{
+	if(!mq_head_defined())
+		LM_WARN("no mqueue defined\n");
+	return 0;
+}
+
+/**
+ * destroy module function
+ */
+static void mod_destroy(void)
+{
+	mq_destroy();
+}
+
+static int w_mq_fetch(struct sip_msg* msg, char* mq, char* str2)
+{
+	if(mq_head_fetch((str*)mq)<0)
+		return -1;
+	return 1;
+}
+
+static int w_mq_add(struct sip_msg* msg, char* mq, char* key, char* val)
+{
+	str qkey;
+	str qval;
+	if(fixup_get_svalue(msg, (gparam_t*)key, &qkey)<0)
+	{
+		LM_ERR("cannot get the key\n");
+		return -1;
+	}
+	if(fixup_get_svalue(msg, (gparam_t*)val, &qval)<0)
+	{
+		LM_ERR("cannot get the val\n");
+		return -1;
+	}
+	if(mq_item_add((str*)mq, &qkey, &qval)<0)
+		return -1;
+	return 1;
+}
+
+static int w_mq_pv_free(struct sip_msg* msg, char* mq, char* str2)
+{
+	mq_pv_free((str*)mq);
+	return 1;
+}
+
+int mq_param(modparam_t type, void *val)
+{
+	str mqs;
+	param_t* params_list = NULL;
+	param_hooks_t phooks;
+	param_t *pit=NULL;
+	str qname = {0, 0};
+	int msize = 0;
+
+	if(val==NULL)
+		return -1;
+
+	if(!shm_initialized())
+	{
+		LM_ERR("shm not intialized - cannot define mqueue now\n");
+		return 0;
+	}
+
+	mqs.s = (char*)val;
+	mqs.len = strlen(mqs.s);
+	if(mqs.s[mqs.len-1]==';')
+		mqs.len--;
+	if (parse_params(&mqs, CLASS_ANY, &phooks, &params_list)<0)
+		return -1;
+	for (pit = params_list; pit; pit=pit->next)
+	{
+		if (pit->name.len==4
+				&& strncasecmp(pit->name.s, "name", 4)==0) {
+			qname = pit->body;
+		} else if(pit->name.len==4
+				&& strncasecmp(pit->name.s, "size", 4)==0) {
+			str2sint(&pit->body, &msize);
+		}  else {
+			LM_ERR("unknown param: %.*s\n", pit->name.len, pit->name.s);
+			free_params(params_list);
+			return -1;
+		}
+	}
+	if(qname.len<=0)
+	{
+		LM_ERR("mqueue name not defined: %.*s\n", mqs.len, mqs.s);
+		free_params(params_list);
+		return -1;
+	}
+	if(mq_head_add(&qname, msize)<0)
+	{
+		LM_ERR("cannot add mqueue: %.*s\n", mqs.len, mqs.s);
+		free_params(params_list);
+		return -1;
+	}
+	free_params(params_list);
+	return 0;
+}
+
+static int fixup_mq_add(void** param, int param_no)
+{
+    if(param_no==2 || param_no==3) {
+		return fixup_spve_null(param, 1);
+    }
+    if (param_no != 1)	{
+		LM_ERR("invalid parameter number %d\n", param_no);
+		return E_UNSPEC;
+    }
+    return fixup_str_null(param, 1);
+}
+
+