Bläddra i källkod

async: use t_continue_cb() for executing via kemi framework

Daniel-Constantin Mierla 7 år sedan
förälder
incheckning
6bdf8119c1
3 ändrade filer med 111 tillägg och 50 borttagningar
  1. 37 25
      src/modules/async/async_mod.c
  2. 72 23
      src/modules/async/async_sleep.c
  3. 2 2
      src/modules/async/async_sleep.h

+ 37 - 25
src/modules/async/async_mod.c

@@ -170,7 +170,7 @@ static int w_async_sleep(sip_msg_t *msg, char *sec, char *str2)
 			LM_ERR("cannot be executed as last action in a route block\n");
 			return -1;
 		}
-		if(async_sleep(msg, s, ap->u.paction->next) < 0)
+		if(async_sleep(msg, s, ap->u.paction->next, NULL) < 0)
 			return -1;
 		/* force exit in config */
 		return 0;
@@ -208,21 +208,27 @@ static int fixup_async_sleep(void **param, int param_no)
  */
 int ki_async_route(sip_msg_t *msg, str *rn, int s)
 {
-	cfg_action_t *act;
+	cfg_action_t *act = NULL;
 	int ri;
-
-	ri = route_get(&main_rt, rn->s);
-	if(ri < 0) {
-		LM_ERR("unable to find route block [%.*s]\n", rn->len, rn->s);
-		return -1;
-	}
-	act = main_rt.rlist[ri];
-	if(act == NULL) {
-		LM_ERR("empty action lists in route block [%.*s]\n", rn->len, rn->s);
-		return -1;
+	sr_kemi_eng_t *keng = NULL;
+
+	keng = sr_kemi_eng_get();
+	if(keng == NULL) {
+		ri = route_lookup(&main_rt, rn->s);
+		if(ri >= 0) {
+			act = main_rt.rlist[ri];
+			if(act == NULL) {
+				LM_ERR("empty action lists in route block [%.*s]\n", rn->len,
+						rn->s);
+				return -1;
+			}
+		} else {
+			LM_ERR("route block not found: %.*s\n", rn->len, rn->s);
+			return -1;
+		}
 	}
 
-	if(async_sleep(msg, s, act) < 0)
+	if(async_sleep(msg, s, act, rn) < 0)
 		return -1;
 	/* force exit in config */
 	return 0;
@@ -277,21 +283,27 @@ static int fixup_async_route(void **param, int param_no)
  */
 int ki_async_task_route(sip_msg_t *msg, str *rn)
 {
-	cfg_action_t *act;
+	cfg_action_t *act = NULL;
 	int ri;
-
-	ri = route_get(&main_rt, rn->s);
-	if(ri < 0) {
-		LM_ERR("unable to find route block [%.*s]\n", rn->len, rn->s);
-		return -1;
-	}
-	act = main_rt.rlist[ri];
-	if(act == NULL) {
-		LM_ERR("empty action lists in route block [%.*s]\n", rn->len, rn->s);
-		return -1;
+	sr_kemi_eng_t *keng = NULL;
+
+	keng = sr_kemi_eng_get();
+	if(keng == NULL) {
+		ri = route_lookup(&main_rt, rn->s);
+		if(ri >= 0) {
+			act = main_rt.rlist[ri];
+			if(act == NULL) {
+				LM_ERR("empty action lists in route block [%.*s]\n", rn->len,
+						rn->s);
+				return -1;
+			}
+		} else {
+			LM_ERR("route block not found: %.*s\n", rn->len, rn->s);
+			return -1;
+		}
 	}
 
-	if(async_send_task(msg, act) < 0)
+	if(async_send_task(msg, act, rn) < 0)
 		return -1;
 	/* force exit in config */
 	return 0;

+ 72 - 23
src/modules/async/async_sleep.c

@@ -32,9 +32,11 @@
 #include "../../core/timer.h"
 #include "../../core/async_task.h"
 #include "../../modules/tm/tm_load.h"
+#include "../../core/kemi.h"
 
 #include "async_sleep.h"
 
+#define ASYNC_CBNAME_SIZE 64
 /* tm */
 extern struct tm_binds tmb;
 
@@ -43,7 +45,9 @@ typedef struct async_item {
 	unsigned int tindex;
 	unsigned int tlabel;
 	unsigned int ticks;
-	cfg_action_t *act;
+	cfg_action_t *ract;
+	char cbname[ASYNC_CBNAME_SIZE];
+	int cbname_len;
 	struct async_item *next;
 } async_item_t;
 
@@ -105,7 +109,7 @@ int async_destroy_timer_list(void)
 	return 0;
 }
 
-int async_sleep(struct sip_msg *msg, int seconds, cfg_action_t *act)
+int async_sleep(sip_msg_t *msg, int seconds, cfg_action_t *act, str *cbname)
 {
 	int slot;
 	unsigned int ticks;
@@ -120,6 +124,10 @@ int async_sleep(struct sip_msg *msg, int seconds, cfg_action_t *act)
 		LM_ERR("max sleep time is %d sec (%d)\n", ASYNC_RING_SIZE, seconds);
 		return -1;
 	}
+	if(cbname && cbname->len>=ASYNC_CBNAME_SIZE-1) {
+		LM_ERR("callback name is too long: %.*s\n", cbname->len, cbname->s);
+		return -1;
+	}
 	t = tmb.t_gett();
 	if(t == NULL || t == T_UNDEFINED) {
 		if(tmb.t_newtran(msg) < 0) {
@@ -142,7 +150,12 @@ int async_sleep(struct sip_msg *msg, int seconds, cfg_action_t *act)
 	}
 	memset(ai, 0, sizeof(async_item_t));
 	ai->ticks = ticks;
-	ai->act = act;
+	ai->ract = act;
+	if(cbname && cbname->len>0) {
+		memcpy(ai->cbname, cbname->s, cbname->len);
+		ai->cbname[cbname->len] = '\0';
+		ai->cbname_len = cbname->len;
+	}
 	if(tmb.t_suspend(msg, &ai->tindex, &ai->tlabel) < 0) {
 		LM_ERR("failed to suspend the processing\n");
 		shm_free(ai);
@@ -160,6 +173,9 @@ void async_timer_exec(unsigned int ticks, void *param)
 {
 	int slot;
 	async_item_t *ai;
+	sr_kemi_eng_t *keng = NULL;
+	str cbname = STR_NULL;
+	str evname = str_init("async:timer-exec");
 
 	if(_async_list_head == NULL)
 		return;
@@ -175,33 +191,57 @@ void async_timer_exec(unsigned int ticks, void *param)
 
 		if(ai == NULL)
 			break;
-		if(ai->act != NULL) {
-			tmb.t_continue(ai->tindex, ai->tlabel, ai->act);
+		if(ai->ract != NULL) {
+			tmb.t_continue(ai->tindex, ai->tlabel, ai->ract);
 			ksr_msg_env_reset();
+		} else {
+			keng = sr_kemi_eng_get();
+			if(keng != NULL && ai->cbname_len>0) {
+				cbname.s = ai->cbname;
+				cbname.len = ai->cbname_len;
+				tmb.t_continue_cb(ai->tindex, ai->tlabel, &cbname, &evname);
+				ksr_msg_env_reset();
+			} else {
+				LM_WARN("no callback to be executed\n");
+			}
 		}
 		shm_free(ai);
 	}
 }
 
+typedef struct async_task_param {
+	unsigned int tindex;
+	unsigned int tlabel;
+	cfg_action_t *ract;
+	char cbname[ASYNC_CBNAME_SIZE];
+	int cbname_len;
+} async_task_param_t;
 
 /**
  *
  */
 void async_exec_task(void *param)
 {
-	cfg_action_t *act;
-	unsigned int *p;
-	unsigned int tindex;
-	unsigned int tlabel;
+	async_task_param_t *atp;
+	sr_kemi_eng_t *keng = NULL;
+	str cbname = STR_NULL;
+	str evname = str_init("async:task-exec");
 
-	act = *((cfg_action_t **)param);
-	p = (unsigned int *)((char *)param + sizeof(cfg_action_t *));
-	tindex = p[0];
-	tlabel = p[1];
+	atp = (async_task_param_t *)param;
 
-	if(act != NULL) {
-		tmb.t_continue(tindex, tlabel, act);
+	if(atp->ract != NULL) {
+		tmb.t_continue(atp->tindex, atp->tlabel, atp->ract);
 		ksr_msg_env_reset();
+	} else {
+		keng = sr_kemi_eng_get();
+		if(keng != NULL && atp->cbname_len > 0) {
+			cbname.s = atp->cbname;
+			cbname.len = atp->cbname_len;
+			tmb.t_continue_cb(atp->tindex, atp->tlabel, &cbname, &evname);
+			ksr_msg_env_reset();
+		} else {
+			LM_WARN("no callback to be executed\n");
+		}
 	}
 	/* param is freed along with the async task strucutre in core */
 }
@@ -209,14 +249,19 @@ void async_exec_task(void *param)
 /**
  *
  */
-int async_send_task(sip_msg_t *msg, cfg_action_t *act)
+int async_send_task(sip_msg_t *msg, cfg_action_t *act, str *cbname)
 {
 	async_task_t *at;
 	tm_cell_t *t = 0;
 	unsigned int tindex;
 	unsigned int tlabel;
 	int dsize;
-	unsigned int *p;
+	async_task_param_t *atp;
+
+	if(cbname && cbname->len>=ASYNC_CBNAME_SIZE-1) {
+		LM_ERR("callback name is too long: %.*s\n", cbname->len, cbname->s);
+		return -1;
+	}
 
 	t = tmb.t_gett();
 	if(t == NULL || t == T_UNDEFINED) {
@@ -230,8 +275,7 @@ int async_send_task(sip_msg_t *msg, cfg_action_t *act)
 			return -1;
 		}
 	}
-	dsize = sizeof(async_task_t) + sizeof(cfg_action_t *)
-			+ 2 * sizeof(unsigned int);
+	dsize = sizeof(async_task_t) + sizeof(async_task_param_t);
 	at = (async_task_t *)shm_malloc(dsize);
 	if(at == NULL) {
 		LM_ERR("no more shm memory\n");
@@ -245,10 +289,15 @@ int async_send_task(sip_msg_t *msg, cfg_action_t *act)
 	}
 	at->exec = async_exec_task;
 	at->param = (char *)at + sizeof(async_task_t);
-	*((cfg_action_t **)at->param) = act;
-	p = (unsigned int *)((char *)at->param + sizeof(cfg_action_t *));
-	p[0] = tindex;
-	p[1] = tlabel;
+	atp = (async_task_param_t *)at->param;
+	atp->ract = act;
+	atp->tindex = tindex;
+	atp->tlabel = tlabel;
+	if(cbname && cbname->len>0) {
+		memcpy(atp->cbname, cbname->s, cbname->len);
+		atp->cbname[cbname->len] = '\0';
+		atp->cbname_len = cbname->len;
+	}
 	async_task_push(at);
 	return 0;
 }

+ 2 - 2
src/modules/async/async_sleep.h

@@ -42,10 +42,10 @@ int async_init_timer_list(void);
 
 int async_destroy_timer_list(void);
 
-int async_sleep(sip_msg_t *msg, int seconds, cfg_action_t *act);
+int async_sleep(sip_msg_t *msg, int seconds, cfg_action_t *act, str *cbname);
 
 void async_timer_exec(unsigned int ticks, void *param);
 
-int async_send_task(sip_msg_t *msg, cfg_action_t *act);
+int async_send_task(sip_msg_t *msg, cfg_action_t *act, str *cbname);
 
 #endif