فهرست منبع

dmq: rework node status and add default response callback

- rework node statuses to: DMN_NODE_ACTIVE, DMQ_NODE_NOT_ACTIVE,
  DMQ_NODE_DISABLED
- add default callback to be used when NULL
- add 3 modparams related to fail_count; increment fail_count only on 408;
  print fail count on rpc list_nodes command
Stefan Mititelu 6 ماه پیش
والد
کامیت
e9af7b5653

+ 8 - 2
src/modules/dmq/dmq.c

@@ -69,6 +69,9 @@ int dmq_multi_notify = 0;
 static sip_uri_t dmq_notification_uri = {0};
 int dmq_ping_interval = 60;
 int dmq_remove_inactive = 1;
+int dmq_fail_count_enabled = 0;
+int dmq_fail_count_threshold_not_active = 0;
+int dmq_fail_count_threshold_disabled = 1;
 
 /* TM bind */
 struct tm_binds _dmq_tmb = {0};
@@ -127,6 +130,9 @@ static param_export_t params[] = {
 	{"multi_notify", PARAM_INT, &dmq_multi_notify},
 	{"worker_usleep", PARAM_INT, &dmq_worker_usleep},
 	{"remove_inactive", PARAM_INT, &dmq_remove_inactive},
+	{"fail_count_enabled", PARAM_INT, &dmq_fail_count_enabled},
+	{"fail_count_threshold_not_active", PARAM_INT, &dmq_fail_count_threshold_not_active},
+	{"fail_count_threshold_disabled", PARAM_INT, &dmq_fail_count_threshold_disabled},
 	{0, 0, 0}
 };
 
@@ -423,11 +429,11 @@ static void dmq_rpc_list_nodes(rpc_t *rpc, void *c)
 		ip_addr2sbuf(&cur->ip_address, ip, IP6_MAX_STR_SIZE);
 		if(rpc->add(c, "{", &h) < 0)
 			goto error;
-		if(rpc->struct_add(h, "SSssSdd", "host", &cur->uri.host, "port",
+		if(rpc->struct_add(h, "SSssSddd", "host", &cur->uri.host, "port",
 				   &cur->uri.port, "proto", get_proto_name(cur->uri.proto),
 				   "resolved_ip", ip, "status", dmq_get_status_str(cur->status),
 				   "last_notification", cur->last_notification, "local",
-				   cur->local)
+				   cur->local, "fail_count", cur->fail_count)
 				< 0)
 			goto error;
 		cur = cur->next;

+ 3 - 0
src/modules/dmq/dmq.h

@@ -51,6 +51,9 @@ extern sip_uri_t dmq_server_uri;
 extern str_list_t *dmq_notification_address_list;
 extern int dmq_multi_notify;
 extern int dmq_remove_inactive;
+extern int dmq_fail_count_enabled;
+extern int dmq_fail_count_threshold_not_active;
+extern int dmq_fail_count_threshold_disabled;
 /* sl and tm */
 extern struct tm_binds _dmq_tmb;
 extern sl_api_t _dmq_slb;

+ 5 - 1
src/modules/dmq/dmq_funcs.c

@@ -252,7 +252,11 @@ int dmq_send_message(dmq_peer_t *peer, str *body, dmq_node_t *node,
 		goto error;
 	}
 	memset(cb_param, 0, sizeof(*cb_param));
-	cb_param->resp_cback = *resp_cback;
+	if(resp_cback == NULL) {
+		cb_param->resp_cback = dmq_default_resp_callback;
+	} else {
+		cb_param->resp_cback = *resp_cback;
+	}
 	cb_param->node = shm_dup_node(node);
 	if(cb_param->node == NULL) {
 		LM_ERR("error building callback parameter\n");

+ 87 - 13
src/modules/dmq/dmqnode.c

@@ -35,9 +35,8 @@ dmq_node_t *dmq_notification_node;
 str dmq_node_status_str = str_init("status");
 /* possible values */
 str dmq_node_active_str = str_init("active");
+str dmq_node_not_active_str = str_init("not_active");
 str dmq_node_disabled_str = str_init("disabled");
-str dmq_node_timeout_str = str_init("timeout");
-str dmq_node_pending_str = str_init("pending");
 
 /**
  * @brief get the string status of the node
@@ -48,15 +47,12 @@ str *dmq_get_status_str(int status)
 		case DMQ_NODE_ACTIVE: {
 			return &dmq_node_active_str;
 		}
+		case DMQ_NODE_NOT_ACTIVE: {
+			return &dmq_node_not_active_str;
+		}
 		case DMQ_NODE_DISABLED: {
 			return &dmq_node_disabled_str;
 		}
-		case DMQ_NODE_TIMEOUT: {
-			return &dmq_node_timeout_str;
-		}
-		case DMQ_NODE_PENDING: {
-			return &dmq_node_pending_str;
-		}
 		default: {
 			return 0;
 		}
@@ -134,12 +130,10 @@ int set_dmq_node_params(dmq_node_t *node, param_t *params)
 	if(status) {
 		if(STR_EQ(*status, dmq_node_active_str)) {
 			node->status = DMQ_NODE_ACTIVE;
-		} else if(STR_EQ(*status, dmq_node_timeout_str)) {
-			node->status = DMQ_NODE_TIMEOUT;
+		} else if(STR_EQ(*status, dmq_node_not_active_str)) {
+			node->status = DMQ_NODE_NOT_ACTIVE;
 		} else if(STR_EQ(*status, dmq_node_disabled_str)) {
 			node->status = DMQ_NODE_DISABLED;
-		} else if(STR_EQ(*status, dmq_node_pending_str)) {
-			node->status = DMQ_NODE_PENDING;
 		} else {
 			LM_ERR("invalid status parameter: %.*s\n", STR_FMT(status));
 			goto error;
@@ -155,7 +149,7 @@ error:
  */
 int set_default_dmq_node_params(dmq_node_t *node)
 {
-	node->status = DMQ_NODE_PENDING;
+	node->status = DMQ_NODE_ACTIVE;
 	return 0;
 }
 
@@ -462,6 +456,62 @@ int update_dmq_node_status(dmq_node_list_t *list, dmq_node_t *node, int status)
 	return 0;
 }
 
+/**
+ * @brief update status of existing dmq node, when 408 timeout received
+ */
+int update_dmq_node_status_on_timeout(
+		dmq_node_list_t *list, dmq_node_t *node, int fail_count_status)
+{
+	dmq_node_t *cur;
+	lock_get(&list->lock);
+	cur = list->nodes;
+	while(cur) {
+		if(cmp_dmq_node(cur, node)) {
+			/* if node has specific status */
+			if(cur->status & fail_count_status) {
+				/* update fail_count*/
+				cur->fail_count++;
+
+				/* update state possibly based on fail_count */
+				/* put the node from not_active to disabled state */
+				if(cur->fail_count > dmq_fail_count_threshold_disabled
+						&& cur->status == DMQ_NODE_NOT_ACTIVE) {
+					LM_WARN("move to disabled: updated fail_count=%d "
+							"fail_threshold_not_active=%d "
+							"fail_threshold_disabled=%d "
+							"host=%.*s port=%.*s\n",
+							cur->fail_count,
+							dmq_fail_count_threshold_not_active,
+							dmq_fail_count_threshold_disabled,
+							node->uri.host.len, node->uri.host.s,
+							node->uri.port.len, node->uri.port.s);
+					cur->status = DMQ_NODE_DISABLED;
+
+					/* put the node from active to not_active state */
+				} else if(cur->fail_count > dmq_fail_count_threshold_not_active
+						  && cur->status == DMQ_NODE_ACTIVE) {
+					LM_WARN("move to not_active: cur->fail_count=%d "
+							"fail_threshold_not_active=%d "
+							"fail_threshold_disabled=%d "
+							"host=%.*s port=%.*s\n",
+							cur->fail_count,
+							dmq_fail_count_threshold_not_active,
+							dmq_fail_count_threshold_disabled,
+							node->uri.host.len, node->uri.host.s,
+							node->uri.port.len, node->uri.port.s);
+					cur->status = DMQ_NODE_NOT_ACTIVE;
+				}
+			}
+			lock_release(&list->lock);
+			return 1;
+		}
+		cur = cur->next;
+	}
+	lock_release(&list->lock);
+	return 0;
+}
+
+
 /**
  * @brief build dmq node string
  */
@@ -504,3 +554,27 @@ int build_node_str(dmq_node_t *node, char *buf, int buflen)
 	len += dmq_get_status_str(node->status)->len;
 	return len;
 }
+
+/**
+ * @brief reset fail counter
+ */
+int reset_dmq_node_fail_count(dmq_node_list_t *list, dmq_node_t *node)
+{
+	dmq_node_t *cur;
+	LM_DBG("trying to acquire dmq_node_list->lock\n");
+	lock_get(&list->lock);
+	LM_DBG("acquired dmq_node_list->lock\n");
+	cur = list->nodes;
+	while(cur) {
+		if(cmp_dmq_node(cur, node)) {
+			cur->fail_count = 0;
+			lock_release(&list->lock);
+			LM_DBG("released dmq_node_list->lock\n");
+			return 1;
+		}
+		cur = cur->next;
+	}
+	lock_release(&list->lock);
+	LM_DBG("released dmq_node_list->lock\n");
+	return 0;
+}

+ 7 - 3
src/modules/dmq/dmqnode.h

@@ -37,13 +37,13 @@
 
 #define NBODY_LEN 1024
 #define DMQ_NODE_ACTIVE 1 << 1
-#define DMQ_NODE_TIMEOUT 1 << 2
+#define DMQ_NODE_NOT_ACTIVE 1 << 2
 #define DMQ_NODE_DISABLED 1 << 3
-#define DMQ_NODE_PENDING 1 << 4
 
 typedef struct dmq_node
 {
-	int local;	  /* local type set means the dmq dmqnode == self */
+	int fail_count; /* counts how many times node responded with response code different than 200 OK */
+	int local;		/* local type set means the dmq dmqnode == self */
 	str orig_uri; /* original uri string - e.g. sip:127.0.0.1:5060;passive=true */
 	struct sip_uri uri;		   /* parsed uri string */
 	struct ip_addr ip_address; /* resolved IP address */
@@ -76,6 +76,8 @@ int dmq_node_del_by_uri(dmq_node_list_t *list, str *suri);
 int cmp_dmq_node(dmq_node_t *node, dmq_node_t *cmpnode);
 int cmp_dmq_node_ip(dmq_node_t *node, dmq_node_t *cmpnode);
 int update_dmq_node_status(dmq_node_list_t *list, dmq_node_t *node, int status);
+int update_dmq_node_status_on_timeout(
+		dmq_node_list_t *list, dmq_node_t *node, int fail_count_status);
 dmq_node_t *shm_dup_node(dmq_node_t *node);
 void destroy_dmq_node(dmq_node_t *node, int shm);
 void shm_free_node(dmq_node_t *node);
@@ -85,6 +87,8 @@ int set_dmq_node_params(dmq_node_t *node, param_t *params);
 str *dmq_get_status_str(int status);
 int build_node_str(dmq_node_t *node, char *buf, int buflen);
 
+int reset_dmq_node_fail_count(dmq_node_list_t *list, dmq_node_t *node);
+
 extern dmq_node_t *dmq_self_node;
 extern dmq_node_t *dmq_notification_node;
 

+ 106 - 3
src/modules/dmq/doc/dmq_admin.xml

@@ -52,6 +52,27 @@
 	major version. Internal structures can be incompatible between different
 	major versions and can lead to crashes or unexpected behaviour.
 	</para>
+	<para>
+	Possible DMQ node statuses:
+	<itemizedlist>
+	<listitem>
+		<para>
+			<emphasis>DMQ_NODE_ACTIVE</emphasis> "active" = node is in dmq node list and responded with response code 200 OK, to KDMQ message.
+			At startup, all nodes are considered active, regardless of their actual state.
+		</para>
+	</listitem>
+	<listitem>
+		<para>
+			<emphasis>DMQ_NODE_NOT_ACTIVE</emphasis> "not_active" = node is in dmq node list and did not respond to KDMQ message (408 timeout).
+		</para>
+	</listitem>
+	<listitem>
+		<para>
+			<emphasis>DMQ_NODE_DISABLED</emphasis> "disabled" = node will be deleted from dmq node list.
+		</para>
+	</listitem>
+	</itemizedlist>
+	</para>
 	<section>
 		<title>KDMQ Request</title>
 		<para>
@@ -81,7 +102,7 @@ Content-Type: text/plain
 
 sip:192.168.40.16:5060;status=active
 sip:192.168.40.15:5060;status=disabled
-sip:192.168.40.17:5060;status=active
+sip:192.168.40.17:5060;status=not_active
 
 ...
 </programlisting>
@@ -300,9 +321,9 @@ modparam("dmq", "ping_interval", 90)
 	<section id="dmq.p.remove_inactive">
 		<title><varname>remove_inactive</varname>(int)</title>
 		<para>
-		A value of zero disable removing dmq nodes. Node status will be changed to pending.
+		A value of zero disable removing dmq nodes. Node status will be changed to not_active.
 		A non-zero value (default is 1) enable removing nodes.
-		For node that is defined as notification address status will be changed to pending.
+		For node that is defined as notification address status will be changed to not_active.
 		Otherwise it will be marked as disabled and then will be removed.
 		</para>
 		<para>
@@ -317,6 +338,88 @@ modparam("dmq", "remove_inactive", 0)
 </programlisting>
 		</example>
 	</section>
+	<section id="dmq.p.fail_count_enabled">
+		<title><varname>fail_count_enabled</varname>(int)</title>
+		<para>
+			Enable per node counting of failed responses.
+		</para>
+		<para>
+		<emphasis>Default value is <quote>0</quote> (disabled).</emphasis>
+		</para>
+		<example>
+		<title>Set <varname>fail_count_enabled</varname> parameter</title>
+		<programlisting format="linespecific">
+...
+modparam("dmq", "fail_count_enabled", 1)
+...
+		</programlisting>
+		</example>
+	</section>
+	<section id="dmq.p.fail_count_threshold_not_active">
+		<title><varname>fail_count_threshold_not_active</varname>(int)</title>
+		<para>
+			DMQ node status change from "active" to "not_active" if 408 fail counts are greater than this threshold value:
+			This threshold can be reached both via internal DMQ notification ping mechanism, and other modules that use dmq to broadcast messages:
+			<itemizedlist>
+			<listitem>
+				<para>
+			Fail counts reset every time a node responds with response code 200 OK, on response callback of notification ping.
+				</para>
+			</listitem>
+			<listitem>
+				<para>
+			Fail counts increase every time a node responds with response code 408, on response callback of notification ping.
+				</para>
+			</listitem>
+			<listitem>
+				<para>
+				Fail counts increase every time a node responds with response code 408, on response callback of other modules that use dmq, <emphasis>only</emphasis> if node is in "active" state.
+				</para>
+			</listitem>
+			</itemizedlist>
+		</para>
+		<para>
+		<emphasis>Default value is <quote>0</quote>.</emphasis> Move node to "not_active" state on first fail.
+		</para>
+		<example>
+		<title>Set <varname>fail_count_threshold_not_active</varname> parameter</title>
+		<programlisting format="linespecific">
+...
+modparam("dmq", "fail_count_threshold_not_active", 100)
+...
+		</programlisting>
+		</example>
+	</section>
+	<section id="dmq.p.fail_count_threshold_disabled">
+		<title><varname>fail_count_threshold_disabled</varname>(int)</title>
+		<para>
+			DMQ node status change from "not_active" to "disabled" if 408 fail counts are greater than this threshold value.
+			This threshold can be reached only via internal DMQ notification ping mechanism:
+			<itemizedlist>
+			<listitem>
+				<para>
+			Fail counts reset every time a node responds with response code 200 OK, on response callback of notification ping.
+				</para>
+			</listitem>
+			<listitem>
+				<para>
+			Fail counts increase every time a node responds with response code 408, on response callback of notification ping.
+				</para>
+			</listitem>
+			</itemizedlist>
+		</para>
+		<para>
+		<emphasis>Default value is <quote>1</quote>.</emphasis> Move node to "disabled" state on second fail.
+		</para>
+		<example>
+		<title>Set <varname>fail_count_threshold_disabled</varname> parameter</title>
+		<programlisting format="linespecific">
+...
+modparam("dmq", "fail_count_threshold_disabled", 105)
+...
+		</programlisting>
+		</example>
+	</section>
 	</section>
 
 	<section>

+ 65 - 11
src/modules/dmq/notification_peer.c

@@ -32,6 +32,7 @@
 str dmq_notification_content_type = str_init("text/plain");
 dmq_resp_cback_t dmq_notification_resp_callback = {
 		&notification_resp_callback_f, 0};
+dmq_resp_cback_t dmq_default_resp_callback = {&default_resp_callback_f, 0};
 
 int *dmq_init_callback_done = 0;
 
@@ -617,8 +618,13 @@ int notification_resp_callback_f(
 	int nodes_recv;
 	str_list_t *slp;
 
-	LM_DBG("notification_callback_f triggered [%p %d %p]\n", msg, code, param);
+	LM_DBG("triggered [%p %d %p]\n", msg, code, param);
 	if(code == 200) {
+		if(dmq_fail_count_enabled) {
+			/* reset node fail counter */
+			reset_dmq_node_fail_count(dmq_node_list, node);
+		}
+
 		/* be sure that the node that answered is in active state */
 		update_dmq_node_status(dmq_node_list, node, DMQ_NODE_ACTIVE);
 		nodes_recv = extract_node_list(dmq_node_list, msg);
@@ -628,28 +634,48 @@ int notification_resp_callback_f(
 			run_init_callbacks();
 		}
 	} else if(code == 408) {
-		if(!dmq_remove_inactive) {
-			/* put the node in pending state */
-			update_dmq_node_status(dmq_node_list, node, DMQ_NODE_PENDING);
+		LM_WARN("timeout: previous fail_count=%d fail_threshold_not_active=%d "
+				"fail_threshold_disabled=%d "
+				"host=%.*s port=%.*s\n",
+				node->fail_count, dmq_fail_count_threshold_not_active,
+				dmq_fail_count_threshold_disabled, node->uri.host.len,
+				node->uri.host.s, node->uri.port.len, node->uri.port.s);
+
+		if(node->status == DMQ_NODE_DISABLED) {
+			/* deleting node - the server did not respond */
+			LM_ERR("deleting server node %.*s because of failed request\n",
+					STR_FMT(&node->orig_uri));
+			ret = del_dmq_node(dmq_node_list, node);
+			LM_DBG("del_dmq_node returned %d\n", ret);
+			return 0;
+		}
+
+		if(dmq_fail_count_enabled) {
+			/* alwaws increment fail_count here and possibly update state */
+			update_dmq_node_status_on_timeout(dmq_node_list, node,
+					(DMQ_NODE_ACTIVE | DMQ_NODE_NOT_ACTIVE
+							| DMQ_NODE_DISABLED));
 			return 0;
 		}
+
 		/* TODO this probably do not work for dmq_multi_notify */
 		slp = dmq_notification_address_list;
 		while(slp != NULL) {
 			if(STR_EQ(node->orig_uri, slp->s)) {
 				LM_ERR("not deleting notification peer [%.*s]\n",
 						STR_FMT(&slp->s));
-				update_dmq_node_status(dmq_node_list, node, DMQ_NODE_PENDING);
+				update_dmq_node_status(
+						dmq_node_list, node, DMQ_NODE_NOT_ACTIVE);
 				return 0;
 			}
 			slp = slp->next;
 		}
-		if(node->status == DMQ_NODE_DISABLED) {
-			/* deleting node - the server did not respond */
-			LM_ERR("deleting server node %.*s because of failed request\n",
-					STR_FMT(&node->orig_uri));
-			ret = del_dmq_node(dmq_node_list, node);
-			LM_DBG("del_dmq_node returned %d\n", ret);
+
+		/* update status only if fail_count mechanism is disabled
+		 * otherwise, let fail_count reach threshold before update status */
+		if(!dmq_remove_inactive) {
+			/* put the node in not_active state */
+			update_dmq_node_status(dmq_node_list, node, DMQ_NODE_NOT_ACTIVE);
 		} else {
 			/* put the node in disabled state and wait for the next ping before deleting it */
 			update_dmq_node_status(dmq_node_list, node, DMQ_NODE_DISABLED);
@@ -657,3 +683,31 @@ int notification_resp_callback_f(
 	}
 	return 0;
 }
+
+/**
+ * @brief default response callback
+ */
+int default_resp_callback_f(
+		struct sip_msg *msg, int code, dmq_node_t *node, void *param)
+{
+	LM_DBG("triggered [%p %d %p]\n", msg, code, param);
+
+	/* if node timeout */
+	if(code == 408) {
+		LM_WARN("timeout: previous fail_count=%d fail_threshold_not_active=%d "
+				"fail_threshold_disabled=%d "
+				"host=%.*s port=%.*s\n",
+				node->fail_count, dmq_fail_count_threshold_not_active,
+				dmq_fail_count_threshold_disabled, node->uri.host.len,
+				node->uri.host.s, node->uri.port.len, node->uri.port.s);
+
+		if(dmq_fail_count_enabled) {
+			/* increment fail_count here and possibly update state, if node in active state */
+			/* this will prevent other modules that use DMQ to affect node state past the not_active state */
+			update_dmq_node_status_on_timeout(
+					dmq_node_list, node, DMQ_NODE_ACTIVE);
+		}
+	}
+
+	return 0;
+}

+ 3 - 0
src/modules/dmq/notification_peer.h

@@ -55,6 +55,9 @@ dmq_node_t *add_server_and_notify(str_list_t *server_list);
 /* helper functions */
 extern int notification_resp_callback_f(
 		struct sip_msg *msg, int code, dmq_node_t *node, void *param);
+extern int default_resp_callback_f(
+		struct sip_msg *msg, int code, dmq_node_t *node, void *param);
 extern dmq_resp_cback_t dmq_notification_resp_callback;
+extern dmq_resp_cback_t dmq_default_resp_callback;
 
 #endif