Răsfoiți Sursa

modified htable module so that it uses dmq for distributed tables
TODO
use dmq only on a per-table basis

Marius Bucur 14 ani în urmă
părinte
comite
50f1a5e0c2

+ 2 - 0
modules_k/dmq/bind_dmq.c

@@ -5,5 +5,7 @@
 
 int bind_dmq(dmq_api_t* api) {
 	api->register_dmq_peer = register_dmq_peer;
+	api->send_message = send_dmq_message;
+	api->bcast_message = bcast_dmq_message;
 	return 0;
 }

+ 7 - 0
modules_k/dmq/bind_dmq.h

@@ -2,9 +2,16 @@
 #define BIND_DMQ_H
 
 #include "peer.h"
+#include "dmqnode.h"
+#include "dmq_funcs.h"
+
+typedef int (*bcast_message_t)(dmq_peer_t* peer, str* body, dmq_node_t* except, dmq_resp_cback_t* resp_cback, int max_forwards);
+typedef int (*send_message_t)(dmq_peer_t* peer, str* body, dmq_node_t* node, dmq_resp_cback_t* resp_cback, int max_forwards);
 
 typedef struct dmq_api {
 	register_dmq_peer_t register_dmq_peer;
+	bcast_message_t bcast_message;
+	send_message_t send_message;
 } dmq_api_t;
 
 typedef int (*bind_dmq_f)(dmq_api_t* api);

+ 2 - 2
modules_k/dmq/dmq_funcs.c

@@ -160,10 +160,10 @@ void ping_servers(unsigned int ticks,void *param) {
 	str* body = build_notification_body();
 	int ret;
 	LM_DBG("ping_servers\n");
-	ret = bcast_dmq_message(dmq_notification_peer, body, notification_node, &notification_callback, 0);
+	ret = bcast_dmq_message(dmq_notification_peer, body, notification_node, &notification_callback, 1);
 	pkg_free(body->s);
 	pkg_free(body);
 	if(ret < 0) {
 		LM_ERR("error broadcasting message\n");
 	}
-}
+}

+ 0 - 1
modules_k/dmq/dmq_funcs.h

@@ -5,7 +5,6 @@
 #include "../../modules/tm/dlg.h"
 #include "../../modules/tm/tm_load.h"
 #include "../../config.h"
-#include "dmq.h"
 #include "peer.h"
 #include "worker.h"
 #include "dmqnode.h"

+ 5 - 4
modules_k/dmq/notification_peer.c

@@ -36,7 +36,7 @@ dmq_node_t* add_server_and_notify(str* server_address) {
 		goto error;
 	}
 	/* request initial list from the notification server */
-	if(request_nodelist(node, 1) < 0) {
+	if(request_nodelist(node, 2) < 0) {
 		LM_ERR("error requesting initial nodelist\n");
 		goto error;
 	}
@@ -132,6 +132,7 @@ int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp) {
 		LM_DBG("max forwards: %.*s\n", STR_FMT(&msg->maxforwards->body));
 		str2int(&msg->maxforwards->body, &maxforwards);
 	}
+	maxforwards--;
 	
 	nodes_recv = extract_node_list(node_list, msg);
 	LM_DBG("received %d new nodes\n", nodes_recv);
@@ -142,9 +143,9 @@ int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp) {
 	resp->resp_code = 200;
 	
 	/* if we received any new nodes tell about them to the others */
-	if(nodes_recv > 0 && maxforwards) {
+	if(nodes_recv > 0 && maxforwards > 0) {
 		/* maxforwards is set to 0 so that the message is will not be in a spiral */
-		bcast_dmq_message(dmq_notification_peer, response_body, 0, &notification_callback, 0);
+		bcast_dmq_message(dmq_notification_peer, response_body, 0, &notification_callback, maxforwards);
 	}
 	LM_DBG("broadcasted message\n");
 	pkg_free(response_body);
@@ -216,4 +217,4 @@ int notification_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node
 		LM_DBG("del_dmq_node returned %d\n", ret);
 	}
 	return 0;
-}
+}

+ 33 - 11
modules_k/htable/ht_serialize.c

@@ -1,10 +1,8 @@
-#include "ht_var.h"
-#include "../../str.h"
 #include "ht_serialize.h"
-#include "../../basex.h"
 
 /* snprintf - pretty ugly, but cds/serialize is unusable for the moment */
-int serialize_ht_pair(pv_value_t* val, str* htname, str* s) {
+int serialize_ht_pair(str* key, pv_value_t* val, str* htname, str* s) {
+	str encoded_key = {0, 0};
 	str encoded_val = {0, 0};
 	str encoded_htname = {0, 0};
 	int len;
@@ -17,7 +15,7 @@ int serialize_ht_pair(pv_value_t* val, str* htname, str* s) {
 		goto error;
 	}
 	if(val->rs.len) {
-		encoded_val.len = base64_enc_len(val->rs.len);
+		encoded_val.len = base16_enc_len(val->rs.len);
 		encoded_val.s = pkg_malloc(encoded_val.len);
 		len = base16_enc((unsigned char*)val->rs.s, val->rs.len, (unsigned char*)encoded_val.s, encoded_val.len);
 		if(len < 0) {
@@ -26,7 +24,7 @@ int serialize_ht_pair(pv_value_t* val, str* htname, str* s) {
 		}
 		encoded_val.len = len;
 	}
-	encoded_htname.len = base64_enc_len(htname->len);
+	encoded_htname.len = base16_enc_len(htname->len);
 	encoded_htname.s = pkg_malloc(encoded_htname.len);
 	len = base16_enc((unsigned char*)htname->s, htname->len, (unsigned char*)encoded_htname.s, encoded_htname.len);
 	if(len < 0) {
@@ -34,7 +32,17 @@ int serialize_ht_pair(pv_value_t* val, str* htname, str* s) {
 		goto error;
 	}
 	encoded_htname.len = len;
-	s->len = snprintf(s->s, s->len, "%d %d %.*s %.*s", val->flags, val->ri, STR_FMT(&encoded_htname), STR_FMT(&encoded_val));
+	
+	encoded_key.len = base16_enc_len(key->len);
+	encoded_key.s = pkg_malloc(encoded_key.len);
+	len = base16_enc((unsigned char*)key->s, key->len, (unsigned char*)encoded_key.s, encoded_key.len);
+	if(len < 0) {
+		LM_ERR("cannot encode key\n");
+		goto error;
+	}
+	encoded_key.len = len;
+	
+	s->len = snprintf(s->s, s->len, "%d %d %.*s %.*s %.*s", val->flags, val->ri, STR_FMT(&encoded_htname), STR_FMT(&encoded_key), STR_FMT(&encoded_val));
 	if(s->len < 0) {
 		LM_ERR("cannot serialize data - probably an small buffer\n");
 		goto error;
@@ -57,32 +65,46 @@ error:
 	return -1;
 }
 
-int deserialize_ht_pair(pv_value_t* val, str* htname, str* src) {
+int deserialize_ht_pair(str* key, pv_value_t* val, str* htname, str* src) {
 	str encoded_htname = {0, 0};
 	str encoded_val = {0, 0};
+	str encoded_key = {0, 0};
 	encoded_htname.s = pkg_malloc(src->len);
 	memset(encoded_htname.s, 0, src->len);
 	encoded_val.s = pkg_malloc(src->len);
 	memset(encoded_val.s, 0, src->len);
+	encoded_key.s = pkg_malloc(src->len);
+	memset(encoded_key.s, 0, src->len);
 	
-	sscanf(src->s, "%d %d %s %s", &val->flags, &val->ri, encoded_htname.s, encoded_val.s);
+	sscanf(src->s, "%d %d %s %s %s", &val->flags, &val->ri, encoded_htname.s, encoded_key.s, encoded_val.s);
 	encoded_htname.len = strlen(encoded_htname.s);
+	encoded_key.len = strlen(encoded_key.s);
 	encoded_val.len = strlen(encoded_val.s);
 	
-	if(base64_dec((unsigned char*)encoded_htname.s, encoded_htname.len, (unsigned char*)htname->s, htname->len) < 0) {
+	htname->len = base16_dec((unsigned char*)encoded_htname.s, encoded_htname.len, (unsigned char*)htname->s, htname->len);
+	if(htname->len < 0) {
 		LM_ERR("cannot decode htname\n");
 		goto error;
 	}
-	if(base64_dec((unsigned char*)encoded_val.s, encoded_val.len, (unsigned char*)val->rs.s, val->rs.len) < 0) {
+	val->rs.len = base16_dec((unsigned char*)encoded_val.s, encoded_val.len, (unsigned char*)val->rs.s, val->rs.len);
+	if(val->rs.len < 0) {
 		LM_ERR("cannot decode val\n");
 		goto error;
 	}
 	
+	key->len = base16_dec((unsigned char*)encoded_key.s, encoded_key.len, (unsigned char*)key->s, key->len);
+	if(key->len < 0) {
+		LM_ERR("cannot decode key\n");
+		goto error;
+	}
+	
 	pkg_free(encoded_htname.s);
+	pkg_free(encoded_key.s);
 	pkg_free(encoded_val.s);
 	return 0;
 error:
 	pkg_free(encoded_htname.s);
+	pkg_free(encoded_key.s);
 	pkg_free(encoded_val.s);
 	return -1;
 }

+ 10 - 0
modules_k/htable/ht_serialize.h

@@ -0,0 +1,10 @@
+#ifndef HT_SERIALIZE_H
+#define HT_SERIALIZE_H
+
+#include "ht_var.h"
+#include "../../str.h"
+#include "../../basex.h"
+
+int serialize_ht_pair(str* key, pv_value_t* val, str* htname, str* s);
+int deserialize_ht_pair(str* key, pv_value_t* val, str* htname, str* src);
+#endif

+ 44 - 3
modules_k/htable/ht_var.c

@@ -22,6 +22,8 @@
 		       
 #include "ht_api.h"
 #include "ht_var.h"
+#include "ht_serialize.h"
+#include "htable.h"
 
 /* pkg copy */
 ht_cell_t *_htc_local=NULL;
@@ -34,7 +36,6 @@ int pv_get_ht_cell(struct sip_msg *msg,  pv_param_t *param,
 	ht_pv_t *hpv;
 
 	hpv = (ht_pv_t*)param->pvn.u.dname;
-
 	if(hpv->ht==NULL)
 	{
 		hpv->ht = ht_get_table(&hpv->htname);
@@ -62,12 +63,39 @@ int pv_get_ht_cell(struct sip_msg *msg,  pv_param_t *param,
 	return pv_get_sintval(msg, param, res, htc->value.n);
 }
 
+int dmq_ht_set_cell(str* key, pv_value_t* val, str* htname) {
+	ht_t* ht = ht_get_table(htname);
+	int_str isval;
+	LM_ERR("dmq_ht_set_cell %.*s %p %d\n", STR_FMT(htname), ht, htname->len);
+	if(ht==NULL) {
+		LM_ERR("error getting table\n");
+		return -1;
+	}
+	if(val->flags&PV_TYPE_INT) {
+		isval.n = val->ri;
+		if(ht_set_cell(ht, key, 0, &isval, 1)!=0)
+		{
+			LM_ERR("cannot set $ht(%.*s)\n", htname->len, htname->s);
+			return -1;
+		}
+	} else {
+		isval.s = val->rs;
+		if(ht_set_cell(ht, key, AVP_VAL_STR, &isval, 1)!=0)
+		{
+			LM_ERR("cannot set $ht(%.*s)\n", htname->len, htname->s);
+			return -1;
+		}
+	}
+	return 0;
+}
+
 int pv_set_ht_cell(struct sip_msg* msg, pv_param_t *param,
 		int op, pv_value_t *val)
 {
 	str htname;
 	int_str isval;
 	ht_pv_t *hpv;
+	str serialized_ht;
 
 	hpv = (ht_pv_t*)param->pvn.u.dname;
 
@@ -81,7 +109,20 @@ int pv_set_ht_cell(struct sip_msg* msg, pv_param_t *param,
 		LM_ERR("cannot get $ht name\n");
 		return -1;
 	}
-	LM_DBG("set value for $ht(%.*s=>%.*s)\n", hpv->htname.len, hpv->htname.s,
+	if(ht_use_dmq) {
+		serialized_ht.s = pkg_malloc(MAX_HT_SERIALIZE_BUF);
+		serialized_ht.len = MAX_HT_SERIALIZE_BUF;
+		if(serialize_ht_pair(&htname, val, &hpv->htname, &serialized_ht) < 0) {
+			LM_ERR("cannot serialize attributes\n");
+			return -1;
+		}
+		if(ht_dmq_bind.bcast_message(ht_dmq_peer, &serialized_ht, NULL, &ht_dmq_resp_cback, 1) < 0) {
+			LM_ERR("error broadcasting dmq message\n");
+			return -1;
+		}
+		pkg_free(serialized_ht.s);
+	}
+	LM_ERR("set value for $ht(%.*s=>%.*s)\n", hpv->htname.len, hpv->htname.s,
 			htname.len, htname.s);
 	if((val==NULL) || (val->flags&PV_VAL_NULL))
 	{
@@ -162,6 +203,7 @@ int pv_parse_ht_name(pv_spec_p sp, str *in)
 		goto error;
 	}
 	hpv->ht = ht_get_table(&hpv->htname);
+	
 	sp->pvp.pvn.u.dname = (void*)hpv;
 	sp->pvp.pvn.type = PV_NAME_OTHER;
 	return 0;
@@ -181,7 +223,6 @@ int pv_get_ht_cell_expire(struct sip_msg *msg,  pv_param_t *param,
 	unsigned int now;
 
 	hpv = (ht_pv_t*)param->pvn.u.dname;
-
 	if(hpv->ht==NULL)
 	{
 		hpv->ht = ht_get_table(&hpv->htname);

+ 2 - 0
modules_k/htable/ht_var.h

@@ -25,6 +25,8 @@
 
 #include "../../pvar.h"
 
+int dmq_ht_set_cell(str*key, pv_value_t* val, str* htname);
+
 int pv_get_ht_cell(struct sip_msg *msg, pv_param_t *param,
 		pv_value_t *res);
 int pv_set_ht_cell(struct sip_msg* msg, pv_param_t *param,

+ 22 - 7
modules_k/htable/htable.c

@@ -39,9 +39,7 @@
 #include "../../lib/kcore/faked_msg.h"
 
 #include "../../pvar.h"
-#include "../dmq/dmq.h"
-#include "../../parser/msg_parser.h"
-#include "../../parser/parse_content.h"
+#include "htable.h"
 
 #include "ht_api.h"
 #include "ht_db.h"
@@ -53,12 +51,14 @@ MODULE_VERSION
 
 /* dmq API structure */
 dmq_api_t ht_dmq_bind;
-register_dmq_peer_t ht_register_dmq;
 dmq_peer_t* ht_dmq_peer;
+dmq_resp_cback_t ht_dmq_resp_cback;
 
 int dmq_htable_callback(struct sip_msg* msg, peer_reponse_t* resp) {
 	int content_length;
 	str body;
+	pv_value_t val;
+	str key, htname;
 	if(parse_headers(msg, HDR_EOH_F, 0) < 0) {
 		LM_ERR("error parsing message headers\n");
 		goto error;
@@ -74,7 +74,22 @@ int dmq_htable_callback(struct sip_msg* msg, peer_reponse_t* resp) {
 	}
 	body.s = get_body(msg);
 	body.len = content_length;
+	
+	htname.s = pkg_malloc(MAX_HT_SERIALIZE_BUF);
+	htname.len = MAX_HT_SERIALIZE_BUF;
+	key.s = pkg_malloc(MAX_HT_SERIALIZE_BUF);
+	key.len = MAX_HT_SERIALIZE_BUF;
+	deserialize_ht_pair(&key, &val, &htname, &body);
+	if(dmq_ht_set_cell(&key, &val, &htname) < 0) {
+		LM_ERR("error setting cell\n");
+		pkg_free(htname.s);
+		pkg_free(key.s);
+		return -1;
+	}
+	pkg_free(htname.s);
+	pkg_free(key.s);
 	LM_ERR("it worked - dmq module triggered the htable callback [%ld %d]\n", time(0), my_pid());
+	
 	str ct = str_init("text/xml");
 	str reason = str_init("200 OK");
 	resp->content_type = ct;
@@ -88,15 +103,16 @@ error:
 
 static void add_dmq_peer() {
 	dmq_peer_t htable_peer;
+	memset(&ht_dmq_resp_cback, 0, sizeof(ht_dmq_resp_cback));
 	htable_peer.peer_id.s = "htable";
 	htable_peer.peer_id.len = 6;
 	htable_peer.description.s = "ditributed htable implementation using dmq";
 	htable_peer.description.len = 42;
 	htable_peer.callback = dmq_htable_callback;
-	ht_dmq_peer = ht_register_dmq(&htable_peer);
+	ht_dmq_peer = ht_dmq_bind.register_dmq_peer(&htable_peer);
 }
 
-int  ht_timer_interval = 20;
+int ht_timer_interval = 20;
 int ht_use_dmq = 0;
 
 static int htable_init_rpc(void);
@@ -229,7 +245,6 @@ static int mod_init(void)
 			LM_ERR("cannot load dmq api\n");
 			return -1;
 		} else {
-			ht_register_dmq = ht_dmq_bind.register_dmq_peer;
 			add_dmq_peer();
 			LM_DBG("presence-dmq loaded\n");
 		}

+ 12 - 0
modules_k/htable/htable.h

@@ -0,0 +1,12 @@
+#ifndef HTABLE_H
+#define HTABLE_H
+#include "../dmq/dmq.h"
+#include "ht_serialize.h"
+#include "../../parser/msg_parser.h"
+#include "../../parser/parse_content.h"
+#define MAX_HT_SERIALIZE_BUF 2048
+extern int ht_use_dmq;
+extern dmq_api_t ht_dmq_bind;
+extern dmq_peer_t* ht_dmq_peer;
+extern dmq_resp_cback_t ht_dmq_resp_cback;
+#endif