瀏覽代碼

Merge pull request #1667 from kamailio/cchance/ht-dmq-sync

htable: added startup synchronization over dmq
Daniel-Constantin Mierla 7 年之前
父節點
當前提交
fb63da5ff6
共有 4 個文件被更改,包括 387 次插入40 次删除
  1. 22 0
      src/modules/htable/doc/htable_admin.xml
  2. 357 38
      src/modules/htable/ht_dmq.c
  3. 5 1
      src/modules/htable/ht_dmq.h
  4. 3 1
      src/modules/htable/htable.c

+ 22 - 0
src/modules/htable/doc/htable_admin.xml

@@ -647,6 +647,28 @@ modparam("htable", "db_expires", 1)
 ...
 modparam("htable", "enable_dmq", 1)
 ...
+</programlisting>
+		</example>
+	</section>
+	<section id="htable.p.dmq_init_sync">
+		<title><varname>dmq_init_sync</varname> (integer)</title>
+		<para>
+			If set to 1, will request synchronization from other nodes at startup. It applies
+			to all tables having the "dmqreplicate" parameter set. As above, it is important to 
+			ensure the definition (size, autoexpire etc.) of replicated tables is identical 
+			across all instances.
+		</para>
+		<para>
+		<emphasis>
+			Default value is 0.
+		</emphasis>
+		</para>
+		<example>
+		<title>Set <varname>dmq_init_sync</varname> parameter</title>
+		<programlisting format="linespecific">
+...
+modparam("htable", "dmq_init_sync", 1)
+...
 </programlisting>
 		</example>
 	</section>

+ 357 - 38
src/modules/htable/ht_dmq.c

@@ -24,11 +24,6 @@
 #include "ht_dmq.h"
 #include "ht_api.h"
 
-static str ht_dmq_content_type = str_init("application/json");
-static str dmq_200_rpl  = str_init("OK");
-static str dmq_400_rpl  = str_init("Bad Request");
-static str dmq_500_rpl  = str_init("Server Internal Error");
-
 typedef struct _ht_dmq_repdata {
 	int action;
 	str htname;
@@ -39,10 +34,155 @@ typedef struct _ht_dmq_repdata {
 	int expire;
 } ht_dmq_repdata_t;
 
+typedef struct _ht_dmq_jdoc_cell_group {
+	int count;
+	int size;
+	srjson_doc_t jdoc;
+	srjson_t *jdoc_cells;
+} ht_dmq_jdoc_cell_group_t;
+
+static str ht_dmq_content_type = str_init("application/json");
+static str dmq_200_rpl  = str_init("OK");
+static str dmq_400_rpl  = str_init("Bad Request");
+static str dmq_500_rpl  = str_init("Server Internal Error");
+static int dmq_cell_group_empty_size = 12; // {"cells":[]}
+static int dmq_cell_group_max_size = 60000;
+static ht_dmq_jdoc_cell_group_t ht_dmq_jdoc_cell_group;
+int ht_dmq_init_sync;
+
 dmq_api_t ht_dmqb;
 dmq_peer_t* ht_dmq_peer = NULL;
 dmq_resp_cback_t ht_dmq_resp_callback = {&ht_dmq_resp_callback_f, 0};
 
+int ht_dmq_send(str* body, dmq_node_t* node);
+int ht_dmq_send_sync(dmq_node_t* node);
+int ht_dmq_handle_sync(srjson_doc_t* jdoc);
+
+static int ht_dmq_cell_group_init(void) {
+
+	if (ht_dmq_jdoc_cell_group.jdoc.root)
+		return 0; // already initialised
+
+	ht_dmq_jdoc_cell_group.count = 0;
+	ht_dmq_jdoc_cell_group.size = dmq_cell_group_empty_size;
+
+	srjson_InitDoc(&ht_dmq_jdoc_cell_group.jdoc, NULL);
+
+	ht_dmq_jdoc_cell_group.jdoc.root = srjson_CreateObject(&ht_dmq_jdoc_cell_group.jdoc);
+	if (ht_dmq_jdoc_cell_group.jdoc.root==NULL) {
+		LM_ERR("cannot create json root object! \n");
+		return -1;
+	}
+
+	ht_dmq_jdoc_cell_group.jdoc_cells = srjson_CreateArray(&ht_dmq_jdoc_cell_group.jdoc);
+	if (ht_dmq_jdoc_cell_group.jdoc_cells==NULL) {
+		LM_ERR("cannot create json cells array! \n");
+		srjson_DestroyDoc(&ht_dmq_jdoc_cell_group.jdoc);
+		return -1;
+	}
+
+	return 0;
+}
+
+static int ht_dmq_cell_group_write(str* htname, ht_cell_t* ptr) {
+
+	// jsonify cell and add to array
+
+	str tmp;
+	srjson_doc_t *jdoc = &ht_dmq_jdoc_cell_group.jdoc;
+	srjson_t *jdoc_cells = ht_dmq_jdoc_cell_group.jdoc_cells;
+	srjson_t * jdoc_cell = srjson_CreateObject(jdoc);
+
+	if(!jdoc_cell) {
+		LM_ERR("cannot create cell json root\n");
+		return -1;
+	}
+
+	// add json overhead
+	if(ptr->flags&AVP_VAL_STR) {
+		ht_dmq_jdoc_cell_group.size += 54; // {"htname":"","cname":"","type":,"strval":"","expire":}
+	} else {
+		ht_dmq_jdoc_cell_group.size += 52; // {"htname":"","cname":"","type":,"intval":,"expire":}
+	}
+
+	srjson_AddStrToObject(jdoc, jdoc_cell, "htname", htname->s, htname->len);
+	ht_dmq_jdoc_cell_group.size += htname->len;
+
+	srjson_AddStrToObject(jdoc, jdoc_cell, "cname", ptr->name.s, ptr->name.len);
+	ht_dmq_jdoc_cell_group.size += ptr->name.len;
+
+	if (ptr->flags&AVP_VAL_STR) {
+		srjson_AddNumberToObject(jdoc, jdoc_cell, "type", AVP_VAL_STR);
+		ht_dmq_jdoc_cell_group.size += 1;
+		srjson_AddStrToObject(jdoc, jdoc_cell, "strval", ptr->value.s.s, ptr->value.s.len);
+		ht_dmq_jdoc_cell_group.size += ptr->value.s.len;
+	} else {
+		srjson_AddNumberToObject(jdoc, jdoc_cell, "type", 0);
+		ht_dmq_jdoc_cell_group.size += 1;
+		srjson_AddNumberToObject(jdoc, jdoc_cell, "intval", ptr->value.n);
+		tmp.s = sint2str((long)ptr->value.n, &tmp.len);
+		ht_dmq_jdoc_cell_group.size += tmp.len;
+	}
+
+	srjson_AddNumberToObject(jdoc, jdoc_cell, "expire", ptr->expire);
+	tmp.s = sint2str((long)ptr->expire, &tmp.len);
+	ht_dmq_jdoc_cell_group.size += tmp.len;
+
+	srjson_AddItemToArray(jdoc, jdoc_cells, jdoc_cell);
+
+	ht_dmq_jdoc_cell_group.count++;
+
+	return 0;
+}
+
+static int ht_dmq_cell_group_flush(dmq_node_t* node) {
+
+	srjson_doc_t *jdoc = &ht_dmq_jdoc_cell_group.jdoc;
+	srjson_t *jdoc_cells = ht_dmq_jdoc_cell_group.jdoc_cells;
+
+	srjson_AddItemToObject(jdoc, jdoc->root, "cells", jdoc_cells);
+
+	LM_DBG("json[%s]\n", srjson_PrintUnformatted(jdoc, jdoc->root));
+	jdoc->buf.s = srjson_PrintUnformatted(jdoc, jdoc->root);
+	if(jdoc->buf.s==NULL) {
+		LM_ERR("unable to serialize data\n");
+		return -1;
+	}
+	jdoc->buf.len = strlen(jdoc->buf.s);
+
+	LM_DBG("sending serialized data %.*s\n", jdoc->buf.len, jdoc->buf.s);
+	if (ht_dmq_send(&jdoc->buf, node)!=0) {
+		LM_ERR("unable to send data\n");
+		return -1;
+	}
+
+	LM_DBG("jdoc size[%d]\n", ht_dmq_jdoc_cell_group.size);
+
+	srjson_Delete(jdoc, jdoc_cells);
+	ht_dmq_jdoc_cell_group.jdoc_cells = srjson_CreateArray(&ht_dmq_jdoc_cell_group.jdoc);
+	if (ht_dmq_jdoc_cell_group.jdoc_cells==NULL) {
+		LM_ERR("cannot re-create json cells array! \n");
+		return -1;
+	}
+
+	ht_dmq_jdoc_cell_group.count = 0;
+	ht_dmq_jdoc_cell_group.size = dmq_cell_group_empty_size;
+
+	return 0;
+}
+
+static void ht_dmq_cell_group_destroy() {
+
+	srjson_doc_t *jdoc = &ht_dmq_jdoc_cell_group.jdoc;
+
+	if(jdoc->buf.s!=NULL) {
+		jdoc->free_fn(jdoc->buf.s);
+		jdoc->buf.s = NULL;
+	}
+	srjson_DestroyDoc(jdoc);
+
+}
+
 /**
  * @brief add notification peer
  */
@@ -59,7 +199,7 @@ int ht_dmq_initialize()
 	}
 
 	not_peer.callback = ht_dmq_handle_msg;
-	not_peer.init_callback = NULL;
+	not_peer.init_callback = (ht_dmq_init_sync ? ht_dmq_request_sync : NULL);
 	not_peer.description.s = "htable";
 	not_peer.description.len = 6;
 	not_peer.peer_id.s = "htable";
@@ -76,14 +216,20 @@ error:
 	return -1;
 }
 
-int ht_dmq_broadcast(str* body)
-{
+int ht_dmq_send(str* body, dmq_node_t* node) {
 	if (!ht_dmq_peer) {
 		LM_ERR("ht_dmq_peer is null!\n");
 		return -1;
 	}
-	LM_DBG("sending broadcast...\n");
-	ht_dmqb.bcast_message(ht_dmq_peer, body, 0, &ht_dmq_resp_callback, 1, &ht_dmq_content_type);
+	if (node) {
+		LM_DBG("sending dmq message ...\n");
+		ht_dmqb.send_message(ht_dmq_peer, body, node,
+				&ht_dmq_resp_callback, 1, &ht_dmq_content_type);
+	} else {
+		LM_DBG("sending dmq broadcast...\n");
+		ht_dmqb.bcast_message(ht_dmq_peer, body, 0,
+				&ht_dmq_resp_callback, 1, &ht_dmq_content_type);
+	}
 	return 0;
 }
 
@@ -138,35 +284,45 @@ int ht_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq
 		}
 	}
 
-	for(it=jdoc.root->child; it; it = it->next)
-	{
-		LM_DBG("found field: %s\n", it->string);
-		if (strcmp(it->string, "action")==0) {
-			action = SRJSON_GET_INT(it);
-		} else if (strcmp(it->string, "htname")==0) {
-			htname.s = it->valuestring;
-			htname.len = strlen(htname.s);
-		} else if (strcmp(it->string, "cname")==0) {
-			cname.s = it->valuestring;
-			cname.len = strlen(cname.s);
-		} else if (strcmp(it->string, "type")==0) {
-			type = SRJSON_GET_INT(it);
-		} else if (strcmp(it->string, "strval")==0) {
-			val.s.s = it->valuestring;
-			val.s.len = strlen(val.s.s);
-		} else if (strcmp(it->string, "intval")==0) {
-			val.n = SRJSON_GET_INT(it);
-		} else if (strcmp(it->string, "mode")==0) {
-			mode = SRJSON_GET_INT(it);
+	if (unlikely(strcmp(jdoc.root->child->string, "cells")==0)) {
+		ht_dmq_handle_sync(&jdoc);
+	} else {
+
+		for(it=jdoc.root->child; it; it = it->next)
+		{
+			LM_DBG("found field: %s\n", it->string);
+			if (strcmp(it->string, "action")==0) {
+				action = SRJSON_GET_INT(it);
+			} else if (strcmp(it->string, "htname")==0) {
+				htname.s = it->valuestring;
+				htname.len = strlen(htname.s);
+			} else if (strcmp(it->string, "cname")==0) {
+				cname.s = it->valuestring;
+				cname.len = strlen(cname.s);
+			} else if (strcmp(it->string, "type")==0) {
+				type = SRJSON_GET_INT(it);
+			} else if (strcmp(it->string, "strval")==0) {
+				val.s.s = it->valuestring;
+				val.s.len = strlen(val.s.s);
+			} else if (strcmp(it->string, "intval")==0) {
+				val.n = SRJSON_GET_INT(it);
+			} else if (strcmp(it->string, "mode")==0) {
+				mode = SRJSON_GET_INT(it);
+			} else {
+				LM_ERR("unrecognized field in json object\n");
+				goto invalid;
+			}
+		}
+
+		if (unlikely(action == HT_DMQ_SYNC)) {
+			ht_dmq_send_sync(dmq_node);
 		} else {
-			LM_ERR("unrecognized field in json object\n");
-			goto invalid;
+			if (ht_dmq_replay_action(action, &htname, &cname, type, &val, mode)!=0) {
+				LM_ERR("failed to replay action\n");
+				goto error;
+			}
 		}
-	}
 
-	if (ht_dmq_replay_action(action, &htname, &cname, type, &val, mode)!=0) {
-		LM_ERR("failed to replay action\n");
-		goto error;
 	}
 
 	srjson_DestroyDoc(&jdoc);
@@ -222,7 +378,7 @@ int ht_dmq_replicate_action(ht_dmq_action_t action, str* htname, str* cname, int
 	if(jdoc.buf.s!=NULL) {
 		jdoc.buf.len = strlen(jdoc.buf.s);
 		LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s);
-		if (ht_dmq_broadcast(&jdoc.buf)!=0) {
+		if (ht_dmq_send(&jdoc.buf, 0)!=0) {
 			goto error;
 		}
 		jdoc.free_fn(jdoc.buf.s);
@@ -264,9 +420,172 @@ int ht_dmq_replay_action(ht_dmq_action_t action, str* htname, str* cname, int ty
 	} else if (action==HT_DMQ_RM_CELL_RE) {
 		return ht_rm_cell_re(&val->s, ht, mode);
 	} else {
-		LM_ERR("unrecognized action");
+		LM_ERR("unrecognized action\n");
+		return -1;
+	}
+}
+
+int ht_dmq_request_sync() {
+
+	srjson_doc_t jdoc;
+
+	LM_DBG("requesting sync from dmq peers\n");
+	srjson_InitDoc(&jdoc, NULL);
+
+	jdoc.root = srjson_CreateObject(&jdoc);
+	if(jdoc.root==NULL) {
+		LM_ERR("cannot create json root\n");
+		goto error;
+	}
+
+	srjson_AddNumberToObject(&jdoc, jdoc.root, "action", HT_DMQ_SYNC);
+	jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root);
+	if(jdoc.buf.s==NULL) {
+		LM_ERR("unable to serialize data\n");
+		goto error;
+	}
+	jdoc.buf.len = strlen(jdoc.buf.s);
+	LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s);
+	if (ht_dmq_send(&jdoc.buf, 0)!=0) {
+		goto error;
+	}
+
+	jdoc.free_fn(jdoc.buf.s);
+	jdoc.buf.s = NULL;
+	srjson_DestroyDoc(&jdoc);
+	return 0;
+
+error:
+	if(jdoc.buf.s!=NULL) {
+		jdoc.free_fn(jdoc.buf.s);
+		jdoc.buf.s = NULL;
+	}
+	srjson_DestroyDoc(&jdoc);
+	return -1;
+}
+
+int ht_dmq_send_sync(dmq_node_t* node) {
+	ht_t *ht;
+	ht_cell_t *it;
+	time_t now;
+	int i;
+
+	ht = ht_get_root();
+	if(ht==NULL)
+	{
+		LM_DBG("no htables to sync!\n");
+		return 0;
+	}
+
+	if (ht_dmq_cell_group_init() < 0)
 		return -1;
+
+	now = time(NULL);
+
+	while (ht != NULL)
+	{
+		if (!ht->dmqreplicate)
+			goto skip;
+
+		for(i=0; i<ht->htsize; i++)
+		{
+			ht_slot_lock(ht, i);
+			it = ht->entries[i].first;
+			while(it)
+			{
+				if(ht->htexpire > 0) {
+					if (it->expire <= now) {
+						LM_DBG("skipping expired entry\n");
+						it = it->next;
+						continue;
+					}
+				}
+
+				if (ht_dmq_cell_group_write(&ht->name, it) < 0) {
+					ht_slot_unlock(ht, i);
+					goto error;
+				}
+
+				if (ht_dmq_jdoc_cell_group.size >= dmq_cell_group_max_size) {
+					LM_DBG("sending group count[%d]size[%d]\n", ht_dmq_jdoc_cell_group.count, ht_dmq_jdoc_cell_group.size);
+					if (ht_dmq_cell_group_flush(node) < 0) {
+						ht_slot_unlock(ht, i);
+						goto error;
+					}
+				}
+
+				it = it->next;
+			}
+			ht_slot_unlock(ht, i);
+		}
+
+skip:
+		ht = ht->next;
 	}
+
+	if (ht_dmq_cell_group_flush(node) < 0)
+		goto error;
+
+	ht_dmq_cell_group_destroy();
+	return 0;
+
+error:
+	ht_dmq_cell_group_destroy();
+	return -1;
+}
+
+int ht_dmq_handle_sync(srjson_doc_t* jdoc) {
+	LM_DBG("handling sync\n");
+
+	srjson_t* cells;
+	srjson_t* cell;
+	srjson_t* it;
+	str htname;
+	str cname;
+	int type;
+	int_str val;
+	int expire;
+	ht_t* ht;
+	time_t now;
+
+
+	cells = jdoc->root->child;
+	cell = cells->child;
+
+	now = time(NULL);
+
+	while (cell) {
+		for(it=cell->child; it; it = it->next) {
+			if (strcmp(it->string, "htname")==0) {
+				htname.s = it->valuestring;
+				htname.len = strlen(htname.s);
+			} else if (strcmp(it->string, "cname")==0) {
+				cname.s = it->valuestring;
+				cname.len = strlen(cname.s);
+			} else if (strcmp(it->string, "type")==0) {
+				type = SRJSON_GET_INT(it);
+			} else if (strcmp(it->string, "strval")==0) {
+				val.s.s = it->valuestring;
+				val.s.len = strlen(val.s.s);
+			} else if (strcmp(it->string, "intval")==0) {
+				val.n = SRJSON_GET_INT(it);
+			} else if (strcmp(it->string, "expire")==0) {
+				expire = SRJSON_GET_INT(it);
+			} else {
+				LM_WARN("unrecognized field in json object\n");
+			}
+		}
+
+		ht = ht_get_table(&htname);
+		if(ht==NULL)
+			LM_WARN("unable to get table %.*s\n", ht->name.len, ht->name.s);
+
+		if (ht_set_cell_ex(ht, &cname, type, &val, 0, expire - now) < 0)
+			LM_WARN("unable to set cell %.*s in table %.*s\n", cname.len, cname.s, ht->name.len, ht->name.s);
+
+		cell = cell->next;
+	}
+	return 0;
 }
 
 /**

+ 5 - 1
src/modules/htable/ht_dmq.h

@@ -31,18 +31,22 @@ extern dmq_api_t ht_dmqb;
 extern dmq_peer_t* ht_dmq_peer;
 extern dmq_resp_cback_t ht_dmq_resp_callback;
 
+int ht_dmq_init_sync;
+
 typedef enum {
 		HT_DMQ_NONE,
         HT_DMQ_SET_CELL,
         HT_DMQ_SET_CELL_EXPIRE,
         HT_DMQ_DEL_CELL,
-        HT_DMQ_RM_CELL_RE
+        HT_DMQ_RM_CELL_RE,
+        HT_DMQ_SYNC
 } ht_dmq_action_t;
 
 int ht_dmq_initialize();
 int ht_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node);
 int ht_dmq_replicate_action(ht_dmq_action_t action, str* htname, str* cname, int type, int_str* val, int mode);
 int ht_dmq_replay_action(ht_dmq_action_t action, str* htname, str* cname, int type, int_str* val, int mode);
+int ht_dmq_request_sync();
 int ht_dmq_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void* param);
 
 #endif

+ 3 - 1
src/modules/htable/htable.c

@@ -52,6 +52,7 @@ MODULE_VERSION
 int  ht_timer_interval = 20;
 int  ht_db_expires_flag = 0;
 int  ht_enable_dmq = 0;
+int  ht_dmq_init_sync = 0;
 int  ht_timer_procs = 0;
 static int ht_event_callback_mode = 0;
 
@@ -153,6 +154,7 @@ static param_export_t params[]={
 	{"timer_interval",      INT_PARAM, &ht_timer_interval},
 	{"db_expires",          INT_PARAM, &ht_db_expires_flag},
 	{"enable_dmq",          INT_PARAM, &ht_enable_dmq},
+	{"dmq_init_sync",       INT_PARAM, &ht_dmq_init_sync},
 	{"timer_procs",         PARAM_INT, &ht_timer_procs},
 	{"event_callback",      PARAM_STR, &ht_event_callback},
 	{"event_callback_mode", PARAM_INT, &ht_event_callback_mode},
@@ -218,7 +220,7 @@ static int mod_init(void)
 		}
 	}
 
-	if (ht_enable_dmq>0 && ht_dmq_initialize()!=0) {
+	if (ht_enable_dmq>0 && ht_dmq_initialize(ht_dmq_init_sync)!=0) {
 		LM_ERR("failed to initialize dmq integration\n");
 		return -1;
 	}