123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499 |
- /**
- * $Id$
- *
- * Copyright (C) 2010 Elena-Ramona Modroiu (asipto.com)
- *
- * This file is part of Kamailio, a free SIP server.
- *
- * This file is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version
- *
- * This file is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
- *
- */
- #include <stdio.h>
- #include <unistd.h>
- #include <stdlib.h>
- #include <string.h>
- #include "../../dprint.h"
- #include "../../mem/mem.h"
- #include "../../mem/shm_mem.h"
- #include "../../parser/parse_param.h"
- #include "../../ut.h"
- #include "../../shm_init.h"
- #include "../../lib/kcore/faked_msg.h"
- #include "mqueue_api.h"
- /**
- *
- */
- typedef struct _mq_item
- {
- str key;
- str val;
- struct _mq_item *prev;
- struct _mq_item *next;
- } mq_item_t;
- /**
- *
- */
- typedef struct _mq_head
- {
- str name;
- int msize;
- int csize;
- gen_lock_t lock;
- mq_item_t *ifirst;
- mq_item_t *ilast;
- struct _mq_head *next;
- } mq_head_t;
- /**
- *
- */
- typedef struct _mq_pv
- {
- str *name;
- mq_item_t *item;
- struct _mq_pv *next;
- } mq_pv_t;
- /**
- *
- */
- static mq_head_t *_mq_head_list = NULL;
- /**
- *
- */
- static mq_pv_t *_mq_pv_list = NULL;
- /**
- *
- */
- int mq_head_defined(void)
- {
- if(_mq_head_list!=NULL)
- return 1;
- return 0;
- }
- /**
- *
- */
- void mq_destroy(void)
- {
- mq_head_t *mh = NULL;
- mq_pv_t *mp = NULL;
- mq_item_t *mi = NULL;
- mq_head_t *mh1 = NULL;
- mq_pv_t *mp1 = NULL;
- mq_item_t *mi1 = NULL;
-
- mh = _mq_head_list;
- while(mh!=NULL)
- {
- mi = mh->ifirst;
- while(mi!=NULL)
- {
- mi1 = mi;
- mi = mi->next;
- shm_free(mi1);
- }
- mh1 = mh;
- mh = mh->next;
- lock_destroy(&mh1->lock);
- shm_free(mh1);
- }
- _mq_head_list = 0;
- mp = _mq_pv_list;
- while(mp!=NULL)
- {
- mp1 = mp;
- mp = mp->next;
- pkg_free(mp1);
- }
- }
- /**
- *
- */
- int mq_head_add(str *name, int msize)
- {
- mq_head_t *mh = NULL;
- mq_pv_t *mp = NULL;
- int len;
- if(!shm_initialized())
- {
- LM_ERR("shm not intialized - cannot define mqueue now\n");
- return 0;
- }
- mh = _mq_head_list;
- while(mh!=NULL)
- {
- if(name->len == mh->name.len
- && strncmp(mh->name.s, name->s, name->len)==0)
- {
- LM_ERR("mqueue redefined: %.*s\n", name->len, name->s);
- return -1;
- }
- mh = mh->next;
- }
- mp = (mq_pv_t*)pkg_malloc(sizeof(mq_pv_t));
- if(mp==NULL)
- {
- LM_ERR("no more pkg for: %.*s\n", name->len, name->s);
- return -1;
- }
- memset(mp, 0, sizeof(mq_pv_t));
- len = sizeof(mq_head_t) + name->len + 1;
- mh = (mq_head_t*)shm_malloc(len);
- if(mh==NULL)
- {
- LM_ERR("no more shm for: %.*s\n", name->len, name->s);
- pkg_free(mp);
- return -1;
- }
- memset(mh, 0, len);
- if (lock_init(&mh->lock)==0 )
- {
- LM_CRIT("failed to init lock\n");
- pkg_free(mp);
- shm_free(mh);
- return -1;
- }
- mh->name.s = (char*)mh + sizeof(mq_head_t);
- memcpy(mh->name.s, name->s, name->len);
- mh->name.len = name->len;
- mh->name.s[name->len] = '\0';
- mh->msize = msize;
- mh->next = _mq_head_list;
- _mq_head_list = mh;
- mp->name = &mh->name;
- mp->next = _mq_pv_list;
- _mq_pv_list = mp;
- return 0;
- }
- /**
- *
- */
- mq_head_t *mq_head_get(str *name)
- {
- mq_head_t *mh = NULL;
- mh = _mq_head_list;
- while(mh!=NULL)
- {
- if(name->len == mh->name.len
- && strncmp(mh->name.s, name->s, name->len)==0)
- {
- return mh;
- }
- mh = mh->next;
- }
- return NULL;
- }
- /**
- *
- */
- mq_pv_t *mq_pv_get(str *name)
- {
- mq_pv_t *mp = NULL;
- mp = _mq_pv_list;
- while(mp!=NULL)
- {
- if(mp->name->len==name->len
- && strncmp(mp->name->s, name->s, name->len)==0)
- return mp;
- mp = mp->next;
- }
- return NULL;
- }
- /**
- *
- */
- int mq_head_fetch(str *name)
- {
- mq_head_t *mh = NULL;
- mq_pv_t *mp = NULL;
- mp = mq_pv_get(name);
- if(mp==NULL)
- return -1;
- if(mp->item!=NULL)
- {
- shm_free(mp->item);
- mp->item = NULL;
- }
- mh = mq_head_get(name);
- if(mh==NULL)
- return -1;
- lock_get(&mh->lock);
- if(mh->ifirst==NULL)
- {
- /* empty queue */
- lock_release(&mh->lock);
- return -2;
- }
- mp->item = mh->ifirst;
- mh->ifirst = mh->ifirst->next;
- if(mh->ifirst==NULL) {
- mh->ilast = NULL;
- } else {
- mh->ifirst->prev = NULL;
- }
- mh->csize--;
- lock_release(&mh->lock);
- return 0;
- }
- /**
- *
- */
- void mq_pv_free(str *name)
- {
- mq_pv_t *mp = NULL;
- mp = mq_pv_get(name);
- if(mp==NULL)
- return;
- if(mp->item!=NULL)
- {
- shm_free(mp->item);
- mp->item = NULL;
- }
- }
- /**
- *
- */
- int mq_item_add(str *qname, str *key, str *val)
- {
- mq_head_t *mh = NULL;
- mq_item_t *mi = NULL;
- int len;
- mh = mq_head_get(qname);
- if(mh==NULL)
- {
- LM_ERR("mqueue not found: %.*s\n", qname->len, qname->s);
- return -1;
- }
- len = sizeof(mq_item_t) + key->len + val->len + 2;
- mi = (mq_item_t*)shm_malloc(len);
- if(mi==NULL)
- {
- LM_ERR("no more shm to add to: %.*s\n", qname->len, qname->s);
- return -1;
- }
- memset(mi, 0, len);
- mi->key.s = (char*)mi + sizeof(mq_item_t);
- memcpy(mi->key.s, key->s, key->len);
- mi->key.len = key->len;
- mi->key.s[key->len] = '\0';
-
- mi->val.s = mi->key.s + mi->key.len + 1;
- memcpy(mi->val.s, val->s, val->len);
- mi->val.len = val->len;
- mi->val.s[val->len] = '\0';
-
- lock_get(&mh->lock);
- if(mh->ifirst==NULL)
- {
- mh->ifirst = mi;
- mh->ilast = mi;
- } else {
- mh->ilast->next = mi;
- mi->prev = mh->ilast;
- mh->ilast = mi;
- }
- mh->csize++;
- if(mh->msize>0 && mh->csize>mh->msize)
- {
- mi = mh->ifirst;
- mh->ifirst = mh->ifirst->next;
- if(mh->ifirst==NULL)
- mh->ilast = NULL;
- else
- mh->ifirst->prev = NULL;
- mh->csize--;
- shm_free(mi);
- }
- lock_release(&mh->lock);
- return 0;
- }
- /**
- *
- */
- int pv_parse_mq_name(pv_spec_t *sp, str *in)
- {
- sp->pvp.pvn.u.isname.name.s = *in;
- sp->pvp.pvn.type = PV_NAME_INTSTR;
- sp->pvp.pvn.u.isname.type = 1;
- return 0;
- }
- str *pv_get_mq_name(sip_msg_t *msg, str *in)
- {
- static str queue;
- pv_spec_t *pvs;
- pv_value_t pvv;
- if (in->s[0] != '$')
- return in;
- else
- {
- if (pv_locate_name(in) != in->len)
- {
- LM_ERR("invalid pv [%.*s]\n", in->len, in->s);
- return NULL;
- }
- if ((pvs = pv_cache_get(in)) == NULL)
- {
- LM_ERR("failed to get pv spec for [%.*s]\n", in->len, in->s);
- return NULL;
- }
- memset(&pvv, 0, sizeof(pv_value_t));
- if (msg==NULL && faked_msg_init() < 0)
- {
- LM_ERR("faked_msg_init() failed\n");
- return NULL;
- }
- if (pv_get_spec_value((msg)?msg:faked_msg_next(), pvs, &pvv) != 0)
- {
- LM_ERR("failed to get pv value for [%.*s]\n", in->len, in->s);
- return NULL;
- }
- queue = pvv.rs;
- }
- return &queue;
- }
- /**
- *
- */
- int pv_get_mqk(struct sip_msg *msg, pv_param_t *param,
- pv_value_t *res)
- {
- mq_pv_t *mp = NULL;
- str *in = pv_get_mq_name(msg, ¶m->pvn.u.isname.name.s);
- if (in == NULL)
- {
- LM_ERR("failed to get mq name\n");
- return -1;
- }
- if (mq_head_get(in) == NULL)
- {
- LM_ERR("mqueue not found: %.*s\n", in->len, in->s);
- return -1;
- }
- mp = mq_pv_get(in);
- if(mp==NULL || mp->item==NULL || mp->item->key.len<=0)
- return pv_get_null(msg, param, res);
- return pv_get_strval(msg, param, res, &mp->item->key);
- }
- /**
- *
- */
- int pv_get_mqv(struct sip_msg *msg, pv_param_t *param,
- pv_value_t *res)
- {
- mq_pv_t *mp = NULL;
- str *in = pv_get_mq_name(msg, ¶m->pvn.u.isname.name.s);
- if (in == NULL)
- {
- LM_ERR("failed to get mq name\n");
- return -1;
- }
- if (mq_head_get(in) == NULL)
- {
- LM_ERR("mqueue not found: %.*s\n", in->len, in->s);
- return -1;
- }
- mp = mq_pv_get(in);
- if(mp==NULL || mp->item==NULL || mp->item->val.len<=0)
- return pv_get_null(msg, param, res);
- return pv_get_strval(msg, param, res, &mp->item->val);
- }
- /**
- *
- */
- int pv_get_mq_size(struct sip_msg *msg, pv_param_t *param,
- pv_value_t *res)
- {
- int mqs = -1;
- str *in = pv_get_mq_name(msg, ¶m->pvn.u.isname.name.s);
- if (in == NULL)
- {
- LM_ERR("failed to get mq name\n");
- return -1;
- }
- mqs = _mq_get_csize(in);
- if (mqs < 0)
- {
- LM_ERR("mqueue not found: %.*s\n", in->len, in->s);
- return -1;
- }
- return pv_get_sintval(msg, param, res, mqs);
- }
- /**
- * Return head->csize for a given queue
- */
- int _mq_get_csize(str *name)
- {
- mq_head_t *mh = mq_head_get(name);
- int mqueue_size = 0;
- if(mh == NULL)
- return -1;
- lock_get(&mh->lock);
- mqueue_size = mh->csize;
- lock_release(&mh->lock);
- return mqueue_size;
- }
|