Browse Source

dmq: add option for non-locking workers

- fixes high CPU condition on some systems (e.g. FreeBSD)
- reported by @soapnix, GH #822
- incorporates patch by Tom Beard (@tombeard)
Charles Chance 8 years ago
parent
commit
71a882122c
5 changed files with 89 additions and 31 deletions
  1. 42 20
      src/modules/dmq/README
  2. 2 0
      src/modules/dmq/dmq.c
  3. 1 0
      src/modules/dmq/dmq.h
  4. 25 0
      src/modules/dmq/doc/dmq_admin.xml
  5. 19 11
      src/modules/dmq/worker.c

+ 42 - 20
src/modules/dmq/README

@@ -14,8 +14,6 @@ Edited by
 
 
 Marius Ovidiu Bucur
 Marius Ovidiu Bucur
 
 
-Edited by
-
 Charles Chance
 Charles Chance
 
 
    Copyright © 2011 Marius Bucur
    Copyright © 2011 Marius Bucur
@@ -41,7 +39,8 @@ Charles Chance
               3.2. notification_address(str)
               3.2. notification_address(str)
               3.3. multi_notify(int)
               3.3. multi_notify(int)
               3.4. num_workers(int)
               3.4. num_workers(int)
-              3.5. ping_interval(int)
+              3.5. worker_usleep(int)
+              3.6. ping_interval(int)
 
 
         4. Functions
         4. Functions
 
 
@@ -74,13 +73,14 @@ Charles Chance
    1.3. Set notification_address parameter
    1.3. Set notification_address parameter
    1.4. Set multi_notify parameter
    1.4. Set multi_notify parameter
    1.5. Set num_workers parameter
    1.5. Set num_workers parameter
-   1.6. Set ping_interval parameter
-   1.7. dmq_handle_message usage
-   1.8. dmq_send_message usage
-   1.9. dmq_bcast_message usage
-   1.10. dmq_t_replicate usage
-   1.11. dmq_is_from_node usage
-   1.12. dmq.list_nodes usage
+   1.6. Set worker_usleep parameter
+   1.7. Set ping_interval parameter
+   1.8. dmq_handle_message usage
+   1.9. dmq_send_message usage
+   1.10. dmq_bcast_message usage
+   1.11. dmq_t_replicate usage
+   1.12. dmq_is_from_node usage
+   1.13. dmq.list_nodes usage
    2.1. dmq_api_t structure
    2.1. dmq_api_t structure
    2.2. register_dmq_peer usage
    2.2. register_dmq_peer usage
    2.3. bcast_message usage
    2.3. bcast_message usage
@@ -102,7 +102,8 @@ Chapter 1. Admin Guide
         3.2. notification_address(str)
         3.2. notification_address(str)
         3.3. multi_notify(int)
         3.3. multi_notify(int)
         3.4. num_workers(int)
         3.4. num_workers(int)
-        3.5. ping_interval(int)
+        3.5. worker_usleep(int)
+        3.6. ping_interval(int)
 
 
    4. Functions
    4. Functions
 
 
@@ -178,7 +179,8 @@ sip:192.168.40.17:5060;status=active
    3.2. notification_address(str)
    3.2. notification_address(str)
    3.3. multi_notify(int)
    3.3. multi_notify(int)
    3.4. num_workers(int)
    3.4. num_workers(int)
-   3.5. ping_interval(int)
+   3.5. worker_usleep(int)
+   3.6. ping_interval(int)
 
 
 3.1. server_address(str)
 3.1. server_address(str)
 
 
@@ -231,14 +233,34 @@ modparam("dmq", "multi_notify", 1)
 modparam("dmq", "num_workers", 4)
 modparam("dmq", "num_workers", 4)
 ...
 ...
 
 
-3.5. ping_interval(int)
+3.5. worker_usleep(int)
+
+   The default locking/synchronisation mechanism between producer/consumer
+   threads is the optimum for most environments. On some systems (e.g.
+   FreeBSD) it can cause high CPU load and in such cases, it can be useful
+   to disable locking and switch to polling for tasks at set intervals
+   instead - putting the thread to sleep in-between and taking it out of
+   process during that time.
+
+   A value >0 will disable the default locking and set the polling
+   interval (in microseconds), which can be tuned to suit the specific
+   environment.
+
+   Default value is 0 (recommended for most systems).
+
+   Example 1.6. Set worker_usleep parameter
+...
+modparam("dmq", "worker_usleep", 1000)
+...
+
+3.6. ping_interval(int)
 
 
    The number of seconds between node pings (for checking status of other
    The number of seconds between node pings (for checking status of other
    nodes).
    nodes).
 
 
    Minimum value is “60” (default).
    Minimum value is “60” (default).
 
 
-   Example 1.6. Set ping_interval parameter
+   Example 1.7. Set ping_interval parameter
 ...
 ...
 modparam("dmq", "ping_interval", 90)
 modparam("dmq", "ping_interval", 90)
 ...
 ...
@@ -258,7 +280,7 @@ modparam("dmq", "ping_interval", 90)
 
 
    This function can be used from REQUEST_ROUTE.
    This function can be used from REQUEST_ROUTE.
 
 
-   Example 1.7. dmq_handle_message usage
+   Example 1.8. dmq_handle_message usage
 ...
 ...
         if(is_method("KDMQ"))
         if(is_method("KDMQ"))
         {
         {
@@ -278,7 +300,7 @@ modparam("dmq", "ping_interval", 90)
 
 
    This function can be used from any route.
    This function can be used from any route.
 
 
-   Example 1.8. dmq_send_message usage
+   Example 1.9. dmq_send_message usage
 ...
 ...
         dmq_send_message("peer_name", "sip:10.0.0.21:5060", "Message body...", "
         dmq_send_message("peer_name", "sip:10.0.0.21:5060", "Message body...", "
 text/plain");
 text/plain");
@@ -296,7 +318,7 @@ text/plain");
 
 
    This function can be used from any route.
    This function can be used from any route.
 
 
-   Example 1.9. dmq_bcast_message usage
+   Example 1.10. dmq_bcast_message usage
 ...
 ...
         dmq_bcast_message("peer_name", "Message body...", "text/plain");
         dmq_bcast_message("peer_name", "Message body...", "text/plain");
 ...
 ...
@@ -318,7 +340,7 @@ text/plain");
 
 
    This function can be used from REQUEST_ROUTE only.
    This function can be used from REQUEST_ROUTE only.
 
 
-   Example 1.10. dmq_t_replicate usage
+   Example 1.11. dmq_t_replicate usage
 ...
 ...
         dmq_t_replicate();
         dmq_t_replicate();
 ...
 ...
@@ -330,7 +352,7 @@ text/plain");
 
 
    This function can be used from REQUEST_ROUTE only.
    This function can be used from REQUEST_ROUTE only.
 
 
-   Example 1.11. dmq_is_from_node usage
+   Example 1.12. dmq_is_from_node usage
 ...
 ...
         # Example REGISTER block
         # Example REGISTER block
         if (dmq_is_from_node()) {
         if (dmq_is_from_node()) {
@@ -351,7 +373,7 @@ op test again
 
 
    List the DMQ nodes. It has no parameters.
    List the DMQ nodes. It has no parameters.
 
 
-   Example 1.12. dmq.list_nodes usage
+   Example 1.13. dmq.list_nodes usage
 ...
 ...
 kamcmd dmq.list_nodes
 kamcmd dmq.list_nodes
 ...
 ...

+ 2 - 0
src/modules/dmq/dmq.c

@@ -57,6 +57,7 @@ int pid = 0;
 
 
 /* module parameters */
 /* module parameters */
 int num_workers = DEFAULT_NUM_WORKERS;
 int num_workers = DEFAULT_NUM_WORKERS;
+int worker_usleep = 0;
 str dmq_server_address = {0, 0};
 str dmq_server_address = {0, 0};
 str dmq_server_socket = {0, 0};
 str dmq_server_socket = {0, 0};
 struct sip_uri dmq_server_uri;
 struct sip_uri dmq_server_uri;
@@ -111,6 +112,7 @@ static param_export_t params[] = {
 	{"server_address", PARAM_STR, &dmq_server_address},
 	{"server_address", PARAM_STR, &dmq_server_address},
 	{"notification_address", PARAM_STR, &dmq_notification_address},
 	{"notification_address", PARAM_STR, &dmq_notification_address},
 	{"multi_notify", INT_PARAM, &multi_notify},
 	{"multi_notify", INT_PARAM, &multi_notify},
+	{"worker_usleep", INT_PARAM, &worker_usleep},
 	{0, 0, 0}
 	{0, 0, 0}
 };
 };
 
 

+ 1 - 0
src/modules/dmq/dmq.h

@@ -37,6 +37,7 @@
 #define MIN_PING_INTERVAL	5
 #define MIN_PING_INTERVAL	5
 
 
 extern int num_workers;
 extern int num_workers;
+extern int worker_usleep;
 extern dmq_worker_t* workers;
 extern dmq_worker_t* workers;
 extern dmq_peer_t* dmq_notification_peer;
 extern dmq_peer_t* dmq_notification_peer;
 extern str dmq_server_address;
 extern str dmq_server_address;

+ 25 - 0
src/modules/dmq/doc/dmq_admin.xml

@@ -171,6 +171,31 @@ modparam("dmq", "num_workers", 4)
 </programlisting>
 </programlisting>
                 </example>
                 </example>
         </section>
         </section>
+	<section id="dmq.p.worker_usleep">
+		<title><varname>worker_usleep</varname>(int)</title>
+		<para>
+		The default locking/synchronisation mechanism between producer/consumer
+		threads is the optimum for most environments. On some systems (e.g. FreeBSD)
+		it can cause high CPU load and in such cases, it can be useful to disable
+		locking and switch to polling for tasks at set intervals instead - putting
+		the thread to sleep in-between and taking it out of process during that time.
+		</para>
+		<para>
+		A value >0 will disable the default locking and set the polling interval
+		(in microseconds), which can be tuned to suit the specific environment.
+		</para>
+		<para>
+		<emphasis>Default value is 0 (recommended for most systems).</emphasis>
+		</para>
+		<example>
+		<title>Set <varname>worker_usleep</varname> parameter</title>
+		<programlisting format="linespecific">
+...
+modparam("dmq", "worker_usleep", 1000)
+...
+</programlisting>
+		</example>
+	</section>
         <section id="dmq.p.ping_interval">
         <section id="dmq.p.ping_interval">
                 <title><varname>ping_interval</varname>(int)</title>
                 <title><varname>ping_interval</varname>(int)</title>
                 <para>
                 <para>

+ 19 - 11
src/modules/dmq/worker.c

@@ -79,12 +79,16 @@ void worker_loop(int id)
 
 
 	worker = &workers[id];
 	worker = &workers[id];
 	for(;;) {
 	for(;;) {
-		LM_DBG("dmq_worker [%d %d] getting lock\n", id, my_pid());
-		lock_get(&worker->lock);
-		LM_DBG("dmq_worker [%d %d] lock acquired\n", id, my_pid());
-		/* multiple lock_release calls might be performed, so remove
-		 * from queue until empty */
-		do {
+		if (worker_usleep <= 0) {
+			LM_DBG("dmq_worker [%d %d] getting lock\n", id, my_pid());
+			lock_get(&worker->lock);
+			LM_DBG("dmq_worker [%d %d] lock acquired\n", id, my_pid());
+		} else {
+			sleep_us(worker_usleep);
+		}
+
+		/* remove from queue until empty */
+		while(job_queue_size(worker->queue) > 0) {
 			/* fill the response with 0's */
 			/* fill the response with 0's */
 			memset(&peer_response, 0, sizeof(peer_response));
 			memset(&peer_response, 0, sizeof(peer_response));
 			current_job = job_queue_pop(worker->queue);
 			current_job = job_queue_pop(worker->queue);
@@ -136,7 +140,7 @@ void worker_loop(int id)
 				shm_free(current_job);
 				shm_free(current_job);
 				worker->jobs_processed++;
 				worker->jobs_processed++;
 			}
 			}
-		} while(job_queue_size(worker->queue) > 0);
+		}
 	}
 	}
 }
 }
 
 
@@ -198,7 +202,9 @@ int add_dmq_job(struct sip_msg* msg, dmq_peer_t* peer)
 	if (job_queue_push(worker->queue, &new_job)<0) {
 	if (job_queue_push(worker->queue, &new_job)<0) {
 		goto error;
 		goto error;
 	}
 	}
-	lock_release(&worker->lock);
+	if (worker_usleep <= 0) {
+		lock_release(&worker->lock);
+	}
 	return 0;
 	return 0;
 error:
 error:
 	if (cloned_msg!=NULL) {
 	if (cloned_msg!=NULL) {
@@ -213,9 +219,11 @@ error:
 void init_worker(dmq_worker_t* worker)
 void init_worker(dmq_worker_t* worker)
 {
 {
 	memset(worker, 0, sizeof(*worker));
 	memset(worker, 0, sizeof(*worker));
-	lock_init(&worker->lock);
-	// acquire the lock for the first time - so that dmq_worker_loop blocks
-	lock_get(&worker->lock);
+	if (worker_usleep <= 0) {
+		lock_init(&worker->lock);
+		// acquire the lock for the first time - so that dmq_worker_loop blocks
+		lock_get(&worker->lock);
+	}
 	worker->queue = alloc_job_queue();
 	worker->queue = alloc_job_queue();
 }
 }