فهرست منبع

async: new function async_task_route(rname)

- execute actions in a routing block via core async framework
Daniel-Constantin Mierla 11 سال پیش
والد
کامیت
cab6c3abed
3فایلهای تغییر یافته به همراه151 افزوده شده و 1 حذف شده
  1. 74 0
      modules/async/async_mod.c
  2. 74 0
      modules/async/async_sleep.c
  3. 3 1
      modules/async/async_sleep.h

+ 74 - 0
modules/async/async_mod.c

@@ -33,6 +33,7 @@
 #include "../../pvar.h"
 #include "../../timer_proc.h"
 #include "../../route_struct.h"
+#include "../../async_task.h"
 #include "../../modules/tm/tm_load.h"
 
 #include "async_sleep.h"
@@ -49,6 +50,8 @@ static int w_async_sleep(struct sip_msg* msg, char* sec, char* str2);
 static int fixup_async_sleep(void** param, int param_no);
 static int w_async_route(struct sip_msg* msg, char* rt, char* sec);
 static int fixup_async_route(void** param, int param_no);
+static int w_async_task_route(struct sip_msg* msg, char* rt, char* p2);
+static int fixup_async_task_route(void** param, int param_no);
 
 /* tm */
 struct tm_binds tmb;
@@ -59,6 +62,8 @@ static cmd_export_t cmds[]={
 		0, REQUEST_ROUTE|FAILURE_ROUTE},
 	{"async_sleep", (cmd_function)w_async_sleep, 1, fixup_async_sleep,
 		0, REQUEST_ROUTE|FAILURE_ROUTE},
+	{"async_task_route", (cmd_function)w_async_task_route, 1, fixup_async_task_route,
+		0, REQUEST_ROUTE|FAILURE_ROUTE},
 	{0, 0, 0, 0, 0, 0}
 };
 
@@ -129,6 +134,9 @@ static void mod_destroy(void)
 	async_destroy_timer_list();
 }
 
+/**
+ *
+ */
 static int w_async_sleep(struct sip_msg* msg, char* sec, char* str2)
 {
 	int s;
@@ -159,6 +167,9 @@ static int w_async_sleep(struct sip_msg* msg, char* sec, char* str2)
 	return -1;
 }
 
+/**
+ *
+ */
 static int fixup_async_sleep(void** param, int param_no)
 {
 	async_param_t *ap;
@@ -179,6 +190,9 @@ static int fixup_async_sleep(void** param, int param_no)
 	return 0;
 }
 
+/**
+ *
+ */
 static int w_async_route(struct sip_msg* msg, char* rt, char* sec)
 {
 	cfg_action_t *act;
@@ -220,6 +234,9 @@ static int w_async_route(struct sip_msg* msg, char* rt, char* sec)
 	return 0;
 }
 
+/**
+ *
+ */
 static int fixup_async_route(void** param, int param_no)
 {
 	if(param_no==1)
@@ -233,3 +250,60 @@ static int fixup_async_route(void** param, int param_no)
 	}
 	return 0;
 }
+
+/**
+ *
+ */
+static int w_async_task_route(struct sip_msg* msg, char* rt, char* sec)
+{
+	cfg_action_t *act;
+	str rn;
+	int ri;
+
+	if(msg==NULL)
+		return -1;
+
+	if(fixup_get_svalue(msg, (gparam_t*)rt, &rn)!=0)
+	{
+		LM_ERR("no async route block name\n");
+		return -1;
+	}
+
+	ri = route_get(&main_rt, rn.s);
+	if(ri<0)
+	{
+		LM_ERR("unable to find route block [%.*s]\n", rn.len, rn.s);
+		return -1;
+	}
+	act = main_rt.rlist[ri];
+	if(act==NULL)
+	{
+		LM_ERR("empty action lists in route block [%.*s]\n", rn.len, rn.s);
+		return -1;
+	}
+
+	if(async_send_task(msg, act)<0)
+		return -1;
+	/* force exit in config */
+	return 0;
+}
+
+/**
+ *
+ */
+static int fixup_async_task_route(void** param, int param_no)
+{
+	if(!async_task_initialized()) {
+		LM_ERR("async task framework was not initialized"
+				" - set async_workers parameter in core\n");
+		return -1;
+	}
+
+	if(param_no==1)
+	{
+		if(fixup_spve_null(param, 1)<0)
+			return -1;
+		return 0;
+	}
+	return 0;
+}

+ 74 - 0
modules/async/async_sleep.c

@@ -31,6 +31,7 @@
 #include "../../ut.h"
 #include "../../locking.h"
 #include "../../timer.h"
+#include "../../async_task.h"
 #include "../../modules/tm/tm_load.h"
 
 #include "async_sleep.h"
@@ -186,3 +187,76 @@ void async_timer_exec(unsigned int ticks, void *param)
 		shm_free(ai);
 	}
 }
+
+
+/**
+ *
+ */
+void async_exec_task(void *param)
+{
+	cfg_action_t *act;
+	unsigned int *p;
+	unsigned int tindex;
+	unsigned int tlabel;
+
+	act = *((cfg_action_t**)param);
+	p = (unsigned int*)((char*)param + sizeof(cfg_action_t*));
+	tindex = p[0];
+	tlabel = p[1];
+
+	if(act!=NULL)
+		tmb.t_continue(tindex, tlabel, act);
+	/* param is freed along with the async task strucutre in core */
+}
+
+/**
+ *
+ */
+int async_send_task(sip_msg_t* msg, cfg_action_t *act)
+{
+	async_task_t *at;
+	tm_cell_t *t = 0;
+	unsigned int tindex;
+	unsigned int tlabel;
+	int dsize;
+	unsigned int *p;
+
+	t = tmb.t_gett();
+	if (t==NULL || t==T_UNDEFINED)
+	{
+		if(tmb.t_newtran(msg)<0)
+		{
+			LM_ERR("cannot create the transaction\n");
+			return -1;
+		}
+		t = tmb.t_gett();
+		if (t==NULL || t==T_UNDEFINED)
+		{
+			LM_ERR("cannot lookup the transaction\n");
+			return -1;
+		}
+	}
+	dsize = sizeof(async_task_t) + sizeof(cfg_action_t*)
+		+ 2*sizeof(unsigned int);
+	at = (async_task_t*)shm_malloc(dsize);
+	if(at==NULL)
+	{
+		LM_ERR("no more shm\n");
+		return -1;
+	}
+	memset(at, 0, dsize);
+	if(tmb.t_suspend(msg, &tindex, &tlabel)<0)
+	{
+		LM_ERR("failed to suppend the processing\n");
+		shm_free(at);
+		return -1;
+	}
+	at->exec = async_exec_task;
+	at->param = (char*)at + sizeof(async_task_t);
+	*((cfg_action_t**)at->param) = act;
+	p = (unsigned int*)((char*)at->param + sizeof(cfg_action_t*));
+	p[0] = tindex;
+	p[1] = tlabel;
+	async_task_push(at);
+	return 0;
+}

+ 3 - 1
modules/async/async_sleep.h

@@ -42,8 +42,10 @@ int async_init_timer_list(void);
 
 int async_destroy_timer_list(void);
 
-int async_sleep(struct sip_msg* msg, int seconds, cfg_action_t *act);
+int async_sleep(sip_msg_t* msg, int seconds, cfg_action_t *act);
 
 void async_timer_exec(unsigned int ticks, void *param);
 
+int async_send_task(sip_msg_t* msg, cfg_action_t *act);
+
 #endif