2
0
Эх сурвалжийг харах

async: added support for millisecond resolution sleep

- new ms_timer parameter to enable millisecond precision timer
- new async_ms_route and async_ms_sleep functions with milliseconds as a param
- implementation:
  Each async_ms_sleep adds an entry to a linked list sorted by expiry time.
  List is checked every ms_timer ms for expired entries.
  All expired entries are pushed for execution on a pool of async workers.
Paweł Pierścionek 6 жил өмнө
parent
commit
ce710ce13a

+ 133 - 1
src/modules/async/async_mod.c

@@ -41,15 +41,20 @@
 MODULE_VERSION
 
 static int async_workers = 1;
+static int async_ms_timer = 0;
 
 static int mod_init(void);
 static int child_init(int);
 static void mod_destroy(void);
 
 static int w_async_sleep(sip_msg_t *msg, char *sec, char *str2);
+static int w_async_ms_sleep(sip_msg_t *msg, char *sec, char *str2);
 static int fixup_async_sleep(void **param, int param_no);
+
 static int w_async_route(sip_msg_t *msg, char *rt, char *sec);
+static int w_async_ms_route(sip_msg_t *msg, char *rt, char *sec);
 static int fixup_async_route(void **param, int param_no);
+
 static int w_async_task_route(sip_msg_t *msg, char *rt, char *p2);
 static int fixup_async_task_route(void **param, int param_no);
 
@@ -60,8 +65,12 @@ struct tm_binds tmb;
 static cmd_export_t cmds[]={
 	{"async_route", (cmd_function)w_async_route, 2, fixup_async_route,
 		0, REQUEST_ROUTE|FAILURE_ROUTE},
+	{"async_ms_route", (cmd_function)w_async_ms_route, 2, fixup_async_route,
+		0, REQUEST_ROUTE|FAILURE_ROUTE},
 	{"async_sleep", (cmd_function)w_async_sleep, 1, fixup_async_sleep,
 		0, REQUEST_ROUTE|FAILURE_ROUTE},
+	{"async_ms_sleep", (cmd_function)w_async_ms_sleep, 1, fixup_async_sleep,
+		0, REQUEST_ROUTE|FAILURE_ROUTE},
 	{"async_task_route", (cmd_function)w_async_task_route, 1, fixup_async_task_route,
 		0, REQUEST_ROUTE|FAILURE_ROUTE},
 	{0, 0, 0, 0, 0, 0}
@@ -69,6 +78,7 @@ static cmd_export_t cmds[]={
 
 static param_export_t params[]={
 	{"workers",     INT_PARAM,   &async_workers},
+	{"ms_timer",    INT_PARAM,   &async_ms_timer},
 	{0, 0, 0}
 };
 
@@ -105,7 +115,17 @@ static int mod_init(void)
 		return -1;
 	}
 
-	register_basic_timers(async_workers);
+        if(async_ms_timer == 0) {
+                LM_INFO("ms_timer is set to 0. Disabling async_ms_sleep and async_ms_route functions\n");
+        } else {
+		if(async_init_ms_timer_list() < 0) {
+			LM_ERR("cannot initialize internal structure\n");
+			return -1;
+		}
+                LM_INFO("Enabled async_ms_sleep and async_ms_route functions with resolution of %dms\n", async_ms_timer);
+	}
+
+	register_basic_timers(async_workers + (async_ms_timer > 0));
 
 	return 0;
 }
@@ -131,6 +151,13 @@ static int child_init(int rank)
 			return -1; /* error */
 		}
 	}
+	
+	if((async_ms_timer > 0) && fork_basic_utimer(PROC_TIMER, "ASYNC MOD MILLI TIMER SINGLETON", 1 /*socks flag*/,
+			   async_mstimer_exec, NULL, 1000 * async_ms_timer /*milliseconds*/)
+			< 0) {
+		LM_ERR("failed to register millisecond timer singleton as process (%d)\n", i);
+		return -1; /* error */
+	}
 
 	return 0;
 }
@@ -141,6 +168,7 @@ static int child_init(int rank)
 static void mod_destroy(void)
 {
 	async_destroy_timer_list();
+	async_destroy_ms_timer_list();
 }
 
 /**
@@ -183,6 +211,46 @@ static int w_async_sleep(sip_msg_t *msg, char *sec, char *str2)
 	return -1;
 }
 
+/**
+ *
+ */
+static int w_async_ms_sleep(sip_msg_t *msg, char *sec, char *str2)
+{
+	int s;
+	async_param_t *ap;
+
+	if(msg == NULL)
+		return -1;
+
+	if(faked_msg_match(msg)) {
+		LM_ERR("invalid usage for faked message\n");
+		return -1;
+	}
+
+	if(async_workers <= 0) {
+		LM_ERR("no async mod timer workers (modparam missing?)\n");
+		return -1;
+	}
+
+	ap = (async_param_t *)sec;
+	if(fixup_get_ivalue(msg, ap->pinterval, &s) != 0) {
+		LM_ERR("no async sleep time value\n");
+		return -1;
+	}
+	if(ap->type == 0) {
+		if(ap->u.paction == NULL || ap->u.paction->next == NULL) {
+			LM_ERR("cannot be executed as last action in a route block\n");
+			return -1;
+		}
+		if(async_ms_sleep(msg, s, ap->u.paction->next, NULL) < 0)
+			return -1;
+		/* force exit in config */
+		return 0;
+	}
+
+	return -1;
+}
+
 /**
  *
  */
@@ -243,6 +311,42 @@ int ki_async_route(sip_msg_t *msg, str *rn, int s)
 	return 0;
 }
 
+/**
+ *
+ */
+int ki_async_ms_route(sip_msg_t *msg, str *rn, int s)
+{
+	cfg_action_t *act = NULL;
+	int ri;
+	sr_kemi_eng_t *keng = NULL;
+
+	if(faked_msg_match(msg)) {
+		LM_ERR("invalid usage for faked message\n");
+		return -1;
+	}
+
+	keng = sr_kemi_eng_get();
+	if(keng == NULL) {
+		ri = route_lookup(&main_rt, rn->s);
+		if(ri >= 0) {
+			act = main_rt.rlist[ri];
+			if(act == NULL) {
+				LM_ERR("empty action lists in route block [%.*s]\n", rn->len,
+						rn->s);
+				return -1;
+			}
+		} else {
+			LM_ERR("route block not found: %.*s\n", rn->len, rn->s);
+			return -1;
+		}
+	}
+
+	if(async_ms_sleep(msg, s, act, rn) < 0)
+		return -1;
+	/* force exit in config */
+	return 0;
+}
+
 /**
  *
  */
@@ -271,6 +375,34 @@ static int w_async_route(sip_msg_t *msg, char *rt, char *sec)
 	return ki_async_route(msg, &rn, s);
 }
 
+/**
+ *
+ */
+static int w_async_ms_route(sip_msg_t *msg, char *rt, char *sec)
+{
+	int s;
+	str rn;
+
+	if(msg == NULL)
+		return -1;
+
+	if(async_workers <= 0) {
+		LM_ERR("no async mod timer workers\n");
+		return -1;
+	}
+
+	if(fixup_get_svalue(msg, (gparam_t *)rt, &rn) != 0) {
+		LM_ERR("no async route block name\n");
+		return -1;
+	}
+
+	if(fixup_get_ivalue(msg, (gparam_t *)sec, &s) != 0) {
+		LM_ERR("no async interval value\n");
+		return -1;
+	}
+	return ki_async_route(msg, &rn, s);
+}
+
 /**
  *
  */

+ 203 - 7
src/modules/async/async_sleep.c

@@ -41,6 +41,14 @@
 extern struct tm_binds tmb;
 
 /* clang-format off */
+typedef struct async_task_param {
+	unsigned int tindex;
+	unsigned int tlabel;
+	cfg_action_t *ract;
+	char cbname[ASYNC_CBNAME_SIZE];
+	int cbname_len;
+} async_task_param_t;
+
 typedef struct async_item {
 	unsigned int tindex;
 	unsigned int tlabel;
@@ -51,6 +59,12 @@ typedef struct async_item {
 	struct async_item *next;
 } async_item_t;
 
+typedef struct async_ms_item {
+	async_task_t *at;
+	struct timeval due;
+	struct async_ms_item *next;
+} async_ms_item_t;
+
 typedef struct async_slot {
 	async_item_t *lstart;
 	async_item_t *lend;
@@ -58,6 +72,15 @@ typedef struct async_slot {
 } async_slot_t;
 
 #define ASYNC_RING_SIZE	100
+#define MAX_MS_SLEEP 30*1000
+#define MAX_MS_SLEEP_QUEUE 10000
+
+static struct async_ms_list {
+	async_ms_item_t *lstart;
+	async_ms_item_t *lend;
+	int	len;
+	gen_lock_t lock;
+} *_async_ms_list = NULL;
 
 static struct async_list_head {
 	async_slot_t ring[ASYNC_RING_SIZE];
@@ -95,6 +118,32 @@ int async_init_timer_list(void)
 	return 0;
 }
 
+int async_init_ms_timer_list(void)
+{
+	_async_ms_list = (struct async_ms_list *)shm_malloc(
+			sizeof(struct async_ms_list));
+	if(_async_ms_list == NULL) {
+		LM_ERR("no more shm\n");
+		return -1;
+	}
+	memset(_async_ms_list, 0, sizeof(struct async_ms_list));
+	if(lock_init(&_async_ms_list->lock) == 0) {
+		LM_ERR("cannot init lock \n");
+		shm_free(_async_ms_list);
+		_async_ms_list = 0;
+		return -1;
+	}
+	return 0;
+}
+
+int async_destroy_ms_timer_list(void)
+{	
+	if (_async_ms_list) {
+		lock_destroy(&_async_ms_list->lock);
+	}
+	return 0;
+}
+
 int async_destroy_timer_list(void)
 {
 	int i;
@@ -109,6 +158,45 @@ int async_destroy_timer_list(void)
 	return 0;
 }
 
+int async_insert_item(async_ms_item_t *ai) 
+{
+	struct timeval *due = &ai->due;
+	
+	if (unlikely(_async_ms_list == NULL))
+		return -1;
+	lock_get(&_async_ms_list->lock);
+	// Check if we want to insert in front
+	if (_async_ms_list->lstart == NULL || timercmp(due, &_async_ms_list->lstart->due, <=)) {
+		ai->next = _async_ms_list->lstart;
+		_async_ms_list->lstart = ai;
+		if (_async_ms_list->lend == NULL)
+			_async_ms_list->lend = ai;
+	} else {
+		// Check if we want to add to the tail
+		if (_async_ms_list->lend && timercmp(due, &_async_ms_list->lend->due, >)) {
+			_async_ms_list->lend->next = ai;
+			_async_ms_list->lend = ai;
+		} else {
+			async_ms_item_t *aip;
+			// Find the place to insert into a sorted timer list
+			// Most likely head && tail scanarios are covered above
+			int i = 1;
+			for (aip = _async_ms_list->lstart; aip->next; aip = aip->next, i++) {
+				if (timercmp(due, &aip->next->due, <=)) {
+					ai->next = aip->next;
+					aip->next = ai;
+					break;
+				}
+			}
+		}
+	}
+	_async_ms_list->len++;
+	lock_release(&_async_ms_list->lock);
+	return 0;	
+}
+
+
+
 int async_sleep(sip_msg_t *msg, int seconds, cfg_action_t *act, str *cbname)
 {
 	int slot;
@@ -209,13 +297,37 @@ void async_timer_exec(unsigned int ticks, void *param)
 	}
 }
 
-typedef struct async_task_param {
-	unsigned int tindex;
-	unsigned int tlabel;
-	cfg_action_t *ract;
-	char cbname[ASYNC_CBNAME_SIZE];
-	int cbname_len;
-} async_task_param_t;
+void async_mstimer_exec(unsigned int ticks, void *param)
+{
+	struct timeval now;
+	gettimeofday(&now, NULL);
+
+	if (_async_ms_list == NULL)
+		return;
+	lock_get(&_async_ms_list->lock);
+	
+	async_ms_item_t *aip, *next;
+	int i = 0;
+	for (aip = _async_ms_list->lstart; aip; aip = next, i++) {
+		next = aip->next;
+		if (timercmp(&now, &aip->due, >=)) {
+			if ((_async_ms_list->lstart = next) == NULL) 
+				_async_ms_list->lend = NULL;
+			if (async_task_push(aip->at)<0) {
+		                shm_free(aip->at);
+			}
+			_async_ms_list->len--;
+			continue;
+		}
+		break;
+	}
+
+	lock_release(&_async_ms_list->lock);
+	
+	return;
+
+}
+
 
 /**
  *
@@ -246,6 +358,90 @@ void async_exec_task(void *param)
 	/* param is freed along with the async task strucutre in core */
 }
 
+int async_ms_sleep(sip_msg_t *msg, int milliseconds, cfg_action_t *act, str *cbname)
+{
+	async_ms_item_t *ai;
+	int dsize;
+	tm_cell_t *t = 0;
+	unsigned int tindex;
+	unsigned int tlabel;
+	async_task_param_t *atp;
+	async_task_t *at;
+
+	if(milliseconds <= 0) {
+		LM_ERR("negative or zero sleep time (%d)\n", milliseconds);
+		return -1;
+	}
+	if(milliseconds >= MAX_MS_SLEEP) {
+		LM_ERR("max sleep time is %d msec\n", MAX_MS_SLEEP);
+		return -1;
+	}
+	if(_async_ms_list->len >= MAX_MS_SLEEP_QUEUE) {
+		LM_ERR("max sleep queue length exceeded (%d) \n", MAX_MS_SLEEP_QUEUE);
+		return -1;
+	}
+	if(cbname && cbname->len>=ASYNC_CBNAME_SIZE-1) {
+		LM_ERR("callback name is too long: %.*s\n", cbname->len, cbname->s);
+		return -1;
+	}
+	dsize = sizeof(async_task_t) + sizeof(async_task_param_t) + sizeof(async_ms_item_t);
+
+	at = (async_task_t *)shm_malloc(dsize);
+	if(at == NULL) {
+		LM_ERR("no more shm memory\n");
+		return -1;
+	}
+	memset(at, 0, dsize);
+	at->param = (char *)at + sizeof(async_task_t);
+	atp = (async_task_param_t *)at->param;
+	ai = (async_ms_item_t *) ((char *)at +  sizeof(async_task_t) + sizeof(async_task_param_t));
+	ai->at = at;
+
+	if(cbname && cbname->len>=ASYNC_CBNAME_SIZE-1) {
+		LM_ERR("callback name is too long: %.*s\n", cbname->len, cbname->s);
+		return -1;
+	}
+
+	t = tmb.t_gett();
+	if(t == NULL || t == T_UNDEFINED) {
+		if(tmb.t_newtran(msg) < 0) {
+			LM_ERR("cannot create the transaction\n");
+			return -1;
+		}
+		t = tmb.t_gett();
+		if(t == NULL || t == T_UNDEFINED) {
+			LM_ERR("cannot lookup the transaction\n");
+			return -1;
+		}
+	}
+	
+	if(tmb.t_suspend(msg, &tindex, &tlabel) < 0) {
+		LM_ERR("failed to suspend the processing\n");
+		shm_free(ai);
+		return -1;
+	}
+	at->exec = async_exec_task;
+	at->param = atp;
+	atp->ract = act;
+	atp->tindex = tindex;
+	atp->tlabel = tlabel;
+	if(cbname && cbname->len>0) {
+		memcpy(atp->cbname, cbname->s, cbname->len);
+		atp->cbname[cbname->len] = '\0';
+		atp->cbname_len = cbname->len;
+	}
+	
+	struct timeval now, upause;
+	gettimeofday(&now, NULL);
+	upause.tv_sec = milliseconds / 1000; 
+	upause.tv_usec = (milliseconds * 1000) % 1000000;
+	
+	timeradd(&now, &upause, &ai->due);	
+	async_insert_item(ai);
+
+	return 0;
+}
+
 /**
  *
  */

+ 5 - 3
src/modules/async/async_sleep.h

@@ -39,13 +39,15 @@ typedef struct async_param {
 /* clang-format on */
 
 int async_init_timer_list(void);
-
 int async_destroy_timer_list(void);
-
 int async_sleep(sip_msg_t *msg, int seconds, cfg_action_t *act, str *cbname);
-
 void async_timer_exec(unsigned int ticks, void *param);
 
+int async_init_ms_timer_list(void);
+int async_destroy_ms_timer_list(void);
+int async_ms_sleep(sip_msg_t *msg, int milliseconds, cfg_action_t *act, str *cbname);
+void async_mstimer_exec(unsigned int ticks, void *param);
+
 int async_send_task(sip_msg_t *msg, cfg_action_t *act, str *cbname);
 
 #endif

+ 113 - 0
src/modules/async/doc/async_admin.xml

@@ -85,6 +85,28 @@
 ...
 modparam("async", "workers", 2)
 ...
+</programlisting>
+		</example>
+	</section>
+	<section>
+		<title><varname>ms_timer</varname> (int)</title>
+		<para>
+			Enables millisecond timer for async_ms_sleep() and async_ms_route() functions.
+			The integer value is the timer resolution in milliseconds.
+			ms_timer = 1 enables 1 millisecond timer but generates higher load on the system.
+			ms_timer = 20 enables 20 ms timer. 
+		</para>
+		<para>
+		<emphasis>
+			Default value is 0.
+		</emphasis>
+		</para>
+		<example>
+		<title>Set <varname>ms_timer</varname> parameter</title>
+		<programlisting format="linespecific">
+...
+modparam("async", "ms_timer", 1)
+...
 </programlisting>
 		</example>
 	</section>
@@ -134,6 +156,52 @@ route[RESUME] {
    exit;
 }
 ...
+</programlisting>
+	    </example>
+	</section>
+	<section id="async.f.async_ms_route">
+	    <title>
+		<function moreinfo="none">async_ms_route(routename, milliseconds)</function>
+	    </title>
+	    <para>
+		Simulate a sleep of 'milliseconds' and then continue the processing of the SIP
+		request with the route[routename]. In case of internal errors, the
+		function returns false, otherwise the function exits the execution of
+		the script at that moment (return 0 behaviour).
+		This function works only if the ms_timer parameter has a value greater then 0.
+		</para>
+		<para>
+		The routename parameter can be a static string or a dynamic string
+		value with config variables.
+		</para>
+		<para>
+		The sleep parameter represent the number of milliseconds to suspend the
+		processing of a SIP request. Maximum value is 30000 (30 sec). The parameter can be
+		a static integer or a variable holding an integer.
+		</para>
+		<para>
+		Since the SIP request handling is resumed in another process,
+		the config file execution state is practically lost. Therefore beware
+		that the execution of config after resume will end once the
+		route[routename] is finished.
+		</para>
+		<para>
+		This function can be used from REQUEST_ROUTE.
+		</para>
+		<example>
+		<title><function>async_ms_route</function> usage</title>
+		<programlisting format="linespecific">
+...
+request_route {
+    ...
+    async_ms_route("RESUME", "250");
+    ...
+}
+route[RESUME] {
+   send_reply("404", "Not found");
+   exit;
+}
+...
 </programlisting>
 	    </example>
 	</section>
@@ -167,6 +235,51 @@ exit;
 	    </example>
 	</section>
 
+	<section id="async.f.async_ms_sleep">
+	    <title>
+		<function moreinfo="none">async_ms_sleep(milliseconds)</function>
+	    </title>
+	    <para>
+		Simulate a sleep of 'milliseconds' and then continue the processing of SIP
+		request with the next action. In case of internal errors, the function
+		returns false.
+		This function works only if the ms_timer parameter has a value greater then 0.
+		</para>
+		<para>
+		The sleep parameter represent the number of milliseconds to suspend the
+		processing of SIP request. Maximum value is 30000 (30 sec). The parameter can be
+		a static integer or a variable holding an integer.
+		</para>
+		<para>
+		This function can be used from REQUEST_ROUTE.
+		</para>
+		<example>
+		<title><function>async_ms_sleep</function> usage</title>
+		<programlisting format="linespecific">
+...
+route[REQUESTSHAPER] {
+        $var(res) = http_connect("leakybucket", 
+        			 "/add?key=$fd", $null, $null,"$avp(delay)");
+        $var(d) = $(avp(delay){s.int});
+	if ($var(d) > 0) {
+		# Delay the request by $avp(delay) ms
+		async_ms_sleep("$var(d)");
+		if (!t_relay()) {
+			sl_reply_error();
+		}
+		exit;
+	} 
+	# No delay
+	if (!t_relay()) {
+		sl_reply_error();
+	}
+        exit;
+}
+...
+</programlisting>
+	    </example>
+	</section>
+
 	<section id="async.f.async_task_route">
 	    <title>
 		<function moreinfo="none">async_task_route(routename)</function>