Przeglądaj źródła

async: new module for asynchronous config operations

- exports async_sleep(seconds) - sleep asynchronously and continue the
  exection after the seconds interval has passed
- it uses t_suspend()/t_continue()
- config execution state is lost, so a return at the same level with
  async_sleep() will exit the config execution
Daniel-Constantin Mierla 14 lat temu
rodzic
commit
58796a4ea5

+ 12 - 0
modules/async/Makefile

@@ -0,0 +1,12 @@
+# $Id$
+#
+# WARNING: do not run this directly, it should be run by the master Makefile
+
+include ../../Makefile.defs
+auto_gen=
+NAME=async.so
+LIBS=
+
+DEFS+=-DOPENSER_MOD_INTERFACE
+
+include ../../Makefile.modules

+ 171 - 0
modules/async/async_mod.c

@@ -0,0 +1,171 @@
+/**
+ * $Id$
+ *
+ * Copyright (C) 2011 Daniel-Constantin Mierla (asipto.com)
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * This file is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ *
+ * This file is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+#include <stdio.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "../../sr_module.h"
+#include "../../dprint.h"
+#include "../../ut.h"
+#include "../../pvar.h"
+#include "../../timer_proc.h"
+#include "../../route_struct.h"
+#include "../../modules/tm/tm_load.h"
+
+#include "async_sleep.h"
+
+MODULE_VERSION
+
+static int async_workers = 1;
+
+static int  mod_init(void);
+static int  child_init(int);
+static void mod_destroy(void);
+
+static int w_async_sleep(struct sip_msg* msg, char* sec, char* str2);
+static int fixup_async_sleep(void** param, int param_no);
+
+/* tm */
+struct tm_binds tmb;
+
+
+static cmd_export_t cmds[]={
+	{"async_sleep", (cmd_function)w_async_sleep, 1, fixup_async_sleep,
+		0, REQUEST_ROUTE|FAILURE_ROUTE|BRANCH_ROUTE},
+	{0, 0, 0, 0, 0, 0}
+};
+
+static param_export_t params[]={
+	{"workers",     INT_PARAM,   &async_workers},
+	{0, 0, 0}
+};
+
+struct module_exports exports = {
+	"async",
+	DEFAULT_DLFLAGS, /* dlopen flags */
+	cmds,
+	params,
+	0,
+	0,              /* exported MI functions */
+	0,              /* exported pseudo-variables */
+	0,              /* extra processes */
+	mod_init,       /* module initialization function */
+	0,              /* response function */
+	mod_destroy,    /* destroy function */
+	child_init      /* per child init function */
+};
+
+
+
+/**
+ * init module function
+ */
+static int mod_init(void)
+{
+	if (load_tm_api( &tmb ) == -1)
+	{
+		LM_ERR("cannot load the TM-functions\n");
+		return -1;
+	}
+
+	if(async_init_timer_list()<0) {
+		LM_ERR("cannot initialize internal structure\n");
+		return -1;
+	}
+
+	register_dummy_timers(async_workers);
+
+	return 0;
+}
+
+/**
+ * @brief Initialize async module children
+ */
+static int child_init(int rank)
+{
+	if (rank!=PROC_MAIN)
+		return 0;
+
+	if(fork_dummy_timer(PROC_TIMER, "ASYNC MOD TIMER", 1 /*socks flag*/,
+				async_timer_exec, NULL, 1 /*sec*/)<0) {
+		LM_ERR("failed to register timer routine as process\n");
+		return -1; /* error */
+	}
+
+	return 0;
+}
+/**
+ * destroy module function
+ */
+static void mod_destroy(void)
+{
+	async_destroy_timer_list();
+}
+
+static int w_async_sleep(struct sip_msg* msg, char* sec, char* str2)
+{
+	int s;
+	async_param_t *ai;
+	
+	if(msg==NULL)
+		return -1;
+
+	ai = (async_param_t*)sec;
+	if(fixup_get_ivalue(msg, ai->pinterval, &s)!=0)
+	{
+		LM_ERR("no async sleep time value\n");
+		return -1;
+	}
+	if(ai->type==0)
+	{
+		if(async_sleep(msg, s, ai->u.paction)<0)
+			return -1;
+		/* force exit in config */
+		return 0;
+	}
+
+	return -1;
+}
+
+static int fixup_async_sleep(void** param, int param_no)
+{
+	async_param_t *ap;
+	if(param_no!=1)
+		return 0;
+	ap = (async_param_t*)pkg_malloc(sizeof(async_param_t));
+	if(ap==NULL)
+	{
+		LM_ERR("no more pkg\n");
+		return -1;
+	}
+	memset(ap, 0, sizeof(async_param_t));
+	ap->u.paction = get_action_from_param(param, param_no);
+	if(fixup_igp_null(param, param_no)<0)
+		return -1;
+	ap->pinterval = (gparam_t*)(*param);
+	*param = (void*)ap;
+	return 0;
+}

+ 187 - 0
modules/async/async_sleep.c

@@ -0,0 +1,187 @@
+/**
+ * $Id$
+ *
+ * Copyright (C) 2011 Daniel-Constantin Mierla (asipto.com)
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * This file is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ *
+ * This file is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+#include <stdio.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "../../dprint.h"
+#include "../../ut.h"
+#include "../../locking.h"
+#include "../../timer.h"
+#include "../../modules/tm/tm_load.h"
+
+#include "async_sleep.h"
+
+/* tm */
+extern struct tm_binds tmb;
+
+typedef struct async_item {
+	unsigned int tindex;
+	unsigned int tlabel;
+	unsigned int ticks;
+	struct action *act;
+	struct async_item *next;
+} async_item_t;
+
+typedef struct async_slot {
+	async_item_t *lstart;
+	async_item_t *lend;
+	gen_lock_t lock;
+} async_slot_t;
+
+#define ASYNC_RING_SIZE	100
+
+static struct async_list_head {
+	async_slot_t ring[ASYNC_RING_SIZE];
+	async_slot_t *later;
+} *_async_list_head = NULL;
+
+int async_init_timer_list(void)
+{
+	int i;
+	_async_list_head = (struct async_list_head*)
+						shm_malloc(sizeof(struct async_list_head));
+	if(_async_list_head==NULL)
+	{
+		LM_ERR("no more shm\n");
+		return -1;
+	}
+	memset(_async_list_head, 0, sizeof(struct async_list_head));
+	for(i=0; i<ASYNC_RING_SIZE; i++)
+	{
+		if(lock_init(&_async_list_head->ring[i].lock)==0)
+		{
+			LM_ERR("cannot init lock at %d\n", i);
+			i--;
+			while(i>=0)
+			{
+				lock_destroy(&_async_list_head->ring[i].lock);
+				i--;
+			}
+			shm_free(_async_list_head);
+			_async_list_head = 0;
+
+			return -1;
+		}
+	}
+	return 0;
+}
+
+int async_destroy_timer_list(void)
+{
+	int i;
+	if(_async_list_head==NULL)
+		return 0;
+	for(i=0; i<ASYNC_RING_SIZE; i++)
+	{
+		/* TODO: clean the list */
+		lock_destroy(&_async_list_head->ring[i].lock);
+	}
+	shm_free(_async_list_head);
+	_async_list_head = 0;
+	return 0;
+}
+
+int async_sleep(struct sip_msg* msg, int seconds, struct action *act)
+{
+	int slot;
+	unsigned int ticks;
+	async_item_t *ai;
+	struct cell *t = 0;
+
+	if(seconds<=0) {
+		LM_ERR("negative or zero sleep time (%d)\n", seconds);
+		return -1;
+	}
+	if(seconds>=ASYNC_RING_SIZE)
+	{
+		LM_ERR("max sleep time is %d sec (%d)\n", ASYNC_RING_SIZE, seconds);
+		return -1;
+	}
+	t = tmb.t_gett();
+	if (t==NULL || t==T_UNDEFINED)
+	{
+		if(tmb.t_newtran(msg)<0)
+		{
+			LM_ERR("cannot create the transaction\n");
+			return -1;
+		}
+		t = tmb.t_gett();
+		if (t==NULL || t==T_UNDEFINED)
+		{
+			LM_ERR("cannot lookup the transaction\n");
+			return -1;
+		}
+	}
+
+	ticks = seconds + get_ticks();
+	slot = ticks % ASYNC_RING_SIZE;
+	ai = (async_item_t*)shm_malloc(sizeof(async_item_t));
+	if(ai==NULL)
+	{
+		LM_ERR("no more shm\n");
+		return -1;
+	}
+	memset(ai, 0, sizeof(async_item_t));
+	ai->ticks = ticks;
+	ai->act = act;
+	if(tmb.t_suspend(msg, &ai->tindex, &ai->tlabel)<0)
+	{
+		LM_ERR("failed to suppend the processing\n");
+		shm_free(ai);
+		return -1;
+	}
+	lock_get(&_async_list_head->ring[slot].lock);
+	ai->next = _async_list_head->ring[slot].lstart;
+	_async_list_head->ring[slot].lstart = ai;
+	lock_release(&_async_list_head->ring[slot].lock);
+
+	return 0;
+}
+
+void async_timer_exec(unsigned int ticks, void *param)
+{
+	int slot;
+	async_item_t *ai;
+
+	if(_async_list_head==NULL)
+		return;
+
+	slot = ticks % ASYNC_RING_SIZE;
+
+	while(1) {
+		lock_get(&_async_list_head->ring[slot].lock);
+		ai = _async_list_head->ring[slot].lstart;
+		if(ai!=NULL)
+			_async_list_head->ring[slot].lstart = ai->next;
+		lock_release(&_async_list_head->ring[slot].lock);
+
+		if(ai==NULL)
+			break;
+		if(ai->act!=NULL && ai->act->next!=NULL)
+			tmb.t_continue(ai->tindex, ai->tlabel, ai->act->next);
+	}
+}

+ 49 - 0
modules/async/async_sleep.h

@@ -0,0 +1,49 @@
+/**
+ * $Id$
+ *
+ * Copyright (C) 2011 Daniel-Constantin Mierla (asipto.com)
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * This file is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ *
+ * This file is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+#ifndef _ASYNC_SLEEP_H_
+#define _ASYNC_SLEEP_H_
+
+#include "../../parser/msg_parser.h"
+#include "../../route_struct.h"
+#include "../../mod_fix.h"
+
+typedef struct async_param {
+	int type;
+	gparam_t *pinterval;
+	union {
+		struct action *paction;
+		gparam_t *proute;
+	} u;
+} async_param_t;
+
+int async_init_timer_list(void);
+
+int async_destroy_timer_list(void);
+
+int async_sleep(struct sip_msg* msg, int seconds, struct action *act);
+
+void async_timer_exec(unsigned int ticks, void *param);
+
+#endif