Browse Source

tm: implemented t_uac_wait_block rpc command

- it blocks while waiting for the reply to return the code and reason
text
Daniel-Constantin Mierla 4 năm trước cách đây
mục cha
commit
dc5a548a9e
3 tập tin đã thay đổi với 274 bổ sung6 xóa
  1. 257 6
      src/modules/tm/rpc_uac.c
  2. 4 0
      src/modules/tm/rpc_uac.h
  3. 13 0
      src/modules/tm/tm.c

+ 257 - 6
src/modules/tm/rpc_uac.c

@@ -23,17 +23,202 @@
 #include "../../core/ut.h"
 #include "../../core/parser/parse_from.h"
 #include "../../core/str_list.h"
+#include "../../core/timer_proc.h"
+#include "../../core/utils/sruid.h"
 #include "ut.h"
 #include "dlg.h"
 #include "uac.h"
 #include "callid.h"
 
 
-
 /* RPC substitution char (used in rpc_t_uac headers) */
 #define SUBST_CHAR '!'
 
+#define TM_RPC_RESPONSE_LIFETIME 300
+#define TM_RPC_RESPONSE_TIMERSTEP 10
+
+void tm_rpc_response_list_clean(unsigned int ticks, void *param);
+
+typedef struct tm_rpc_response {
+	str ruid;
+	int flags;
+	int rcode;
+	str rtext;
+	time_t rtime;
+	struct tm_rpc_response *next;
+} tm_rpc_response_t;
+
+typedef struct tm_rpc_response_list {
+	gen_lock_t rlock;
+	tm_rpc_response_t *rlist;
+} tm_rpc_response_list_t;
+
+static tm_rpc_response_list_t *_tm_rpc_response_list = NULL;
+
+static sruid_t _tm_rpc_sruid;
+
+/**
+ *
+ */
+int tm_rpc_response_list_init(void)
+{
+	if(_tm_rpc_response_list != NULL) {
+		return 0;
+	}
+	if(sruid_init(&_tm_rpc_sruid, '-', "tmrp", SRUID_INC)<0) {
+		LM_ERR("failed to init sruid\n");
+		return -1;
+	}
+	if(sr_wtimer_add(tm_rpc_response_list_clean, 0,
+				TM_RPC_RESPONSE_TIMERSTEP)<0) {
+		LM_ERR("failed to register timer routine\n");
+		return -1;
+	}
+	_tm_rpc_response_list = shm_malloc(sizeof(tm_rpc_response_list_t));
+	if(_tm_rpc_response_list == NULL) {
+		SHM_MEM_ERROR;
+		return -1;
+	}
+
+	memset(_tm_rpc_response_list, 0, sizeof(tm_rpc_response_list_t));
+
+	lock_init(&_tm_rpc_response_list->rlock);
 
+	return 0;
+}
+
+/**
+ *
+ */
+int tm_rpc_response_list_destroy(void)
+{
+	tm_rpc_response_t *rl0 = NULL;
+	tm_rpc_response_t *rl1 = NULL;
+
+	if(_tm_rpc_response_list == NULL) {
+		return 0;
+	}
+
+	rl1 = _tm_rpc_response_list->rlist;
+
+	while(rl1!=NULL) {
+		rl0 = rl1;
+		rl1 = rl1->next;
+		shm_free(rl0);
+	}
+	lock_destroy(&_tm_rpc_response_list->rlock);
+	shm_free(_tm_rpc_response_list);
+	_tm_rpc_response_list = NULL;
+
+	return 0;
+}
+
+/**
+ *
+ */
+int tm_rpc_response_list_add(str *ruid, int rcode, str *rtext)
+{
+	size_t rsize = 0;
+	tm_rpc_response_t *ri = NULL;
+	if(_tm_rpc_response_list == NULL) {
+		LM_ERR("rpc response list not initialized\n");
+		return -1;
+	}
+
+	rsize = sizeof(tm_rpc_response_t) + ruid->len + 2
+				+ ((rtext!=NULL)?rtext->len:0);
+
+	ri = (tm_rpc_response_t*)shm_malloc(rsize);
+	if(ri==NULL) {
+		SHM_MEM_ERROR;
+		return -1;
+	}
+	memset(ri, 0, rsize);
+
+	ri->ruid.s = (char*)ri + sizeof(tm_rpc_response_t);
+	ri->ruid.len = ruid->len;
+	memcpy(ri->ruid.s, ruid->s, ruid->len);
+	ri->rtime = time(NULL);
+	ri->rcode = rcode;
+	if(rtext!=NULL) {
+		ri->rtext.s = ri->ruid.s + ri->ruid.len + 1;
+		ri->rtext.len = rtext->len;
+		memcpy(ri->rtext.s, rtext->s, rtext->len);
+	}
+	lock_get(&_tm_rpc_response_list->rlock);
+	ri->next = _tm_rpc_response_list->rlist;
+	_tm_rpc_response_list->rlist = ri;
+	lock_release(&_tm_rpc_response_list->rlock);
+
+	return 0;
+}
+
+/**
+ *
+ */
+tm_rpc_response_t *tm_rpc_response_list_get(str *ruid)
+{
+	tm_rpc_response_t *ri0 = NULL;
+	tm_rpc_response_t *ri1 = NULL;
+
+	if(_tm_rpc_response_list == NULL) {
+		LM_ERR("rpc response list not initialized\n");
+		return NULL;
+	}
+
+	lock_get(&_tm_rpc_response_list->rlock);
+	ri1 = _tm_rpc_response_list->rlist;
+	while(ri1!=NULL) {
+		if(ri1->ruid.len==ruid->len
+				&& memcmp(ri1->ruid.s, ruid->s, ruid->len)==0) {
+			if(ri0 == NULL) {
+				_tm_rpc_response_list->rlist = ri1->next;
+			} else {
+				ri0->next = ri1->next;
+			}
+			lock_release(&_tm_rpc_response_list->rlock);
+			return ri1;
+		}
+		ri0 = ri1;
+		ri1 = ri1->next;
+	}
+	lock_release(&_tm_rpc_response_list->rlock);
+	return NULL;
+}
+
+/**
+ *
+ */
+void tm_rpc_response_list_clean(unsigned int ticks, void *param)
+{
+	tm_rpc_response_t *ri0 = NULL;
+	tm_rpc_response_t *ri1 = NULL;
+	time_t tnow;
+
+	if(_tm_rpc_response_list == NULL) {
+		return;
+	}
+
+	tnow = time(NULL);
+	lock_get(&_tm_rpc_response_list->rlock);
+	ri1 = _tm_rpc_response_list->rlist;
+	while(ri1!=NULL) {
+		if(ri1->rtime < tnow - TM_RPC_RESPONSE_LIFETIME) {
+			if(ri0 == NULL) {
+				_tm_rpc_response_list->rlist = ri1->next;
+			} else {
+				ri0->next = ri1->next;
+			}
+			LM_DBG("freeing item [%.*s]\n", ri1->ruid.len, ri1->ruid.s);
+			shm_free(ri1);
+			ri1 = ri0->next;
+		} else {
+			ri0 = ri1;
+			ri1 = ri1->next;
+		}
+	}
+	lock_release(&_tm_rpc_response_list->rlock);
+}
 
 /** make sure the rpc user created the msg properly.
  * Make sure that the FIFO user created the message
@@ -393,6 +578,25 @@ static void rpc_uac_callback(struct cell* t, int type, struct tmcb_params* ps)
 }
 
 
+/* t_uac callback */
+static void rpc_uac_block_callback(struct cell* t, int type,
+		struct tmcb_params* ps)
+{
+	str *ruid;
+	str rtext;
+
+	ruid = (str*)(*ps->param);
+	*ps->param=0;
+	if (ps->rpl==FAKED_REPLY) {
+		rtext.s = error_text(ps->code);
+		rtext.len = strlen(rtext.s);
+	} else {
+		rtext = ps->rpl->first_line.u.reply.reason;
+	}
+	tm_rpc_response_list_add(ruid, ps->code, &rtext);
+	shm_free(ruid);
+}
+
 
 /** rpc t_uac version-
  * It expects the following list of strings as parameters:
@@ -433,11 +637,15 @@ static void rpc_t_uac(rpc_t* rpc, void* c, int reply_wait)
 	dlg_t dlg;
 	uac_req_t uac_req;
 	rpc_delayed_ctx_t* dctx;
+	str *ruid = NULL;
+	tm_rpc_response_t *ritem = NULL;
+	int rcount = 0;
+	void* th = NULL;
 
 	body.s=0;
 	body.len=0;
 	dctx=0;
-	if (reply_wait && (rpc->capabilities == 0 ||
+	if (reply_wait==1 && (rpc->capabilities == 0 ||
 				!(rpc->capabilities(c) & RPC_DELAYED_REPLY))) {
 		rpc->fault(c, 600, "Reply wait/async mode not supported"
 				" by this rpc transport");
@@ -538,7 +746,7 @@ static void rpc_t_uac(rpc_t* rpc, void* c, int reply_wait)
 	if(hfb.s!=NULL && hfb.len>0) uac_req.headers=&hfb;
 	uac_req.body=body.len?&body:0;
 	uac_req.dialog=&dlg;
-	if (reply_wait){
+	if (reply_wait==1){
 		dctx=rpc->delayed_ctx_new(c);
 		if (dctx==0){
 			rpc->fault(c, 500, "internal error: failed to create context");
@@ -551,22 +759,57 @@ static void rpc_t_uac(rpc_t* rpc, void* c, int reply_wait)
 		   want to still send a reply */
 		rpc=&dctx->rpc;
 		c=dctx->reply_ctx;
+	} else if (reply_wait==2) {
+		sruid_next(&_tm_rpc_sruid);
+		uac_req.cb = rpc_uac_block_callback;
+		ruid = shm_str_dup_block(&_tm_rpc_sruid.uid);
+		uac_req.cbp = ruid;
+		uac_req.cb_flags = TMCB_LOCAL_COMPLETED;
 	}
+
 	ret = t_uac(&uac_req);
 
 	if (ret <= 0) {
 		err_ret = err2reason_phrase(ret, &sip_error, err_buf,
 				sizeof(err_buf), "RPC/UAC") ;
-		if (err_ret > 0 )
-		{
+		if (err_ret > 0 ) {
 			rpc->fault(c, sip_error, "%s", err_buf);
 		} else {
 			rpc->fault(c, 500, "RPC/UAC error");
 		}
-		if (dctx)
+		if (dctx) {
 			rpc->delayed_ctx_close(dctx);
+		}
+		if(ruid) {
+			shm_free(ruid);
+		}
 		goto error01;
 	}
+
+	if(reply_wait==2) {
+		while(ritem==NULL && rcount<800) {
+			sleep_us(100000);
+			rcount++;
+			ritem = tm_rpc_response_list_get(&_tm_rpc_sruid.uid);
+		}
+		if(ritem == NULL) {
+			rpc->fault(c, 500, "No response");
+		} else {
+			/* add structure node */
+			if (rpc->add(c, "{", &th) < 0) {
+				rpc->fault(c, 500, "Structure error");
+			} else {
+				if(rpc->struct_add(th, "dS",
+						"code", 	ritem->rcode,
+						"text", 	&ritem->rtext)<0) {
+					rpc->fault(c, 500, "Fields error");
+					return;
+				}
+			}
+			shm_free(ritem);
+		}
+	}
+
 error01:
 	if (hfb.s) pkg_free(hfb.s);
 error:
@@ -590,6 +833,14 @@ void rpc_t_uac_wait(rpc_t* rpc, void* c)
 	rpc_t_uac(rpc, c, 1);
 }
 
+/** t_uac with blocking for reply waiting.
+ * @see rpc_t_uac.
+ */
+void rpc_t_uac_wait_block(rpc_t* rpc, void* c)
+{
+	rpc_t_uac(rpc, c, 2);
+}
+
 
 static int t_uac_check_msg(struct sip_msg* msg,
 		str* method, str* body,

+ 4 - 0
src/modules/tm/rpc_uac.h

@@ -21,8 +21,12 @@
 #include "../../core/rpc.h"
 
 
+int tm_rpc_response_list_init(void);
+int tm_rpc_response_list_destroy(void);
+
 void rpc_t_uac_start(rpc_t* rpc, void* c);
 void rpc_t_uac_wait(rpc_t* rpc, void* c);
+void rpc_t_uac_wait_block(rpc_t* rpc, void* c);
 
 int t_uac_send(str *method, str *ruri, str *nexthop, str *send_socket,
 		str *headers, str *body);

+ 13 - 0
src/modules/tm/tm.c

@@ -723,6 +723,11 @@ static int mod_init(void)
 		return -1;
 	}
 
+	if(tm_rpc_response_list_init()<0) {
+		LM_ERR("failed to init rpc\n");
+		return -1;
+	}
+
 	if(on_sl_reply_name.s!=NULL && on_sl_reply_name.len>0) {
 		keng = sr_kemi_eng_get();
 		if(keng==NULL) {
@@ -2767,6 +2772,13 @@ static const char* rpc_t_uac_wait_doc[2] = {
 	0
 };
 
+static const char* rpc_t_uac_wait_block_doc[2] = {
+	"starts a tm uac and waits for the final reply in blocking mode, using a"
+		" list of string parameters: method, ruri, dst_uri send_sock, headers"
+		" (CRLF separated) and body (optional)",
+	0
+};
+
 static const char* tm_rpc_list_doc[2] = {
 	"List transactions.",
 	0
@@ -2787,6 +2799,7 @@ static rpc_export_t tm_rpc[] = {
 	{"tm.hash_stats",  tm_rpc_hash_stats, tm_rpc_hash_stats_doc, 0},
 	{"tm.t_uac_start", rpc_t_uac_start, rpc_t_uac_start_doc, 0 },
 	{"tm.t_uac_wait",  rpc_t_uac_wait,  rpc_t_uac_wait_doc, RET_ARRAY},
+	{"tm.t_uac_wait_block",  rpc_t_uac_wait_block,  rpc_t_uac_wait_block_doc, 0},
 	{"tm.list",  tm_rpc_list,  tm_rpc_list_doc, RET_ARRAY},
 	{"tm.clean", tm_rpc_clean,  tm_rpc_clean_doc, 0},
 	{0, 0, 0, 0}