2
0
Эх сурвалжийг харах

jsonrpcs: hanlde delayed response for jsonrpc over http

Daniel-Constantin Mierla 8 жил өмнө
parent
commit
c73557694a

+ 128 - 12
src/modules/jsonrpcs/jsonrpcs_mod.c

@@ -42,6 +42,9 @@
 #include "../../core/cfg/cfg_struct.h"
 #include "../../core/cfg/cfg_struct.h"
 #include "../../core/resolve.h"
 #include "../../core/resolve.h"
 #include "../../core/ip_addr.h"
 #include "../../core/ip_addr.h"
+#include "../../core/sip_msg_clone.h"
+#include "../../core/data_lump.h"
+#include "../../core/data_lump_rpl.h"
 #include "../../modules/xhttp/api.h"
 #include "../../modules/xhttp/api.h"
 
 
 #include "jsonrpcs_mod.h"
 #include "jsonrpcs_mod.h"
@@ -249,6 +252,8 @@ static void jsonrpc_fault(jsonrpc_ctx_t* ctx, int code, char* fmt, ...)
 {
 {
 	va_list ap;
 	va_list ap;
 
 
+	jsonrpc_delayed_reply_ctx_init(ctx);
+
 	ctx->http_code = code;
 	ctx->http_code = code;
 	va_start(ap, fmt);
 	va_start(ap, fmt);
 	vsnprintf(jsonrpc_error_buf, JSONRPC_ERROR_REASON_BUF_LEN, fmt, ap);
 	vsnprintf(jsonrpc_error_buf, JSONRPC_ERROR_REASON_BUF_LEN, fmt, ap);
@@ -472,6 +477,8 @@ static int jsonrpc_add(jsonrpc_ctx_t* ctx, char* fmt, ...)
 	void **void_ptr;
 	void **void_ptr;
 	va_list ap;
 	va_list ap;
 
 
+	jsonrpc_delayed_reply_ctx_init(ctx);
+
 	va_start(ap, fmt);
 	va_start(ap, fmt);
 	while(*fmt) {
 	while(*fmt) {
 		if (*fmt == '{' || *fmt == '[') {
 		if (*fmt == '{' || *fmt == '[') {
@@ -647,6 +654,8 @@ static int jsonrpc_rpl_printf(jsonrpc_ctx_t* ctx, char* fmt, ...)
 	va_list ap;
 	va_list ap;
 	srjson_t *nj = NULL;
 	srjson_t *nj = NULL;
 
 
+	jsonrpc_delayed_reply_ctx_init(ctx);
+
 	buf = tbuf;
 	buf = tbuf;
 	buf_size = JSONRPC_PRINT_VALUE_BUF_LEN;
 	buf_size = JSONRPC_PRINT_VALUE_BUF_LEN;
 	while (1) {
 	while (1) {
@@ -889,11 +898,36 @@ static int jsonrpc_struct_printf(srjson_t *jnode, char* mname, char* fmt, ...)
 }
 }
 
 
 
 
+static void jsonrpc_clean_context(jsonrpc_ctx_t* ctx)
+{
+	if (!ctx) return;
+	srjson_DeleteDoc(ctx->jreq);
+	if(ctx->rpl_node!=NULL) {
+		srjson_Delete(ctx->jrpl, ctx->rpl_node);
+		ctx->rpl_node = NULL;
+	}
+	srjson_DeleteDoc(ctx->jrpl);
+}
+
+
 /** Returns the RPC capabilities supported by the xmlrpc driver.
 /** Returns the RPC capabilities supported by the xmlrpc driver.
  */
  */
 static rpc_capabilities_t jsonrpc_capabilities(jsonrpc_ctx_t* ctx)
 static rpc_capabilities_t jsonrpc_capabilities(jsonrpc_ctx_t* ctx)
 {
 {
-	/* No support for async commands.*/
+	/* support for async commands - delayed response */
+	return RPC_DELAYED_REPLY;
+}
+
+/** if this a delayed reply context,
+ * and it's never been use before, initialize it */
+static int jsonrpc_delayed_reply_ctx_init(jsonrpc_ctx_t* ctx)
+{
+	if  ((ctx->flags & JSONRPC_DELAYED_CTX_F)
+			&& (ctx->jrpl==0)) {
+		if (jsonrpc_init_reply(ctx) < 0)
+			return -1;
+		jsonrpc_reset_plain_reply(ctx->jrpl->free_fn);
+	}
 	return 0;
 	return 0;
 }
 }
 
 
@@ -908,6 +942,51 @@ static rpc_capabilities_t jsonrpc_capabilities(jsonrpc_ctx_t* ctx)
  */
  */
 static struct rpc_delayed_ctx* jsonrpc_delayed_ctx_new(jsonrpc_ctx_t* ctx)
 static struct rpc_delayed_ctx* jsonrpc_delayed_ctx_new(jsonrpc_ctx_t* ctx)
 {
 {
+	struct rpc_delayed_ctx* ret;
+	int size;
+	jsonrpc_ctx_t* r_ctx;
+	sip_msg_t* shm_msg;
+	int len;
+
+	ret=0;
+	shm_msg=0;
+
+	if (ctx->reply_sent) {
+		LM_ERR("response already sent - cannot create a delayed context\n");
+		return 0; /* no delayed reply if already replied */
+	}
+
+	if (ctx->transport!=JSONRPC_TRANS_HTTP) {
+		LM_ERR("delayed response implemented only for HTTP transport\n");
+		return 0;
+	}
+	/* clone the sip msg */
+	if(ctx->msg!=NULL) {
+		shm_msg=sip_msg_shm_clone(ctx->msg, &len, 1);
+		if (shm_msg==0)
+			goto error;
+	}
+
+	/* alloc into one block */
+	size=ROUND_POINTER(sizeof(*ret))+sizeof(jsonrpc_ctx_t);
+	if ((ret=shm_malloc(size))==0)
+		goto error;
+	memset(ret, 0, size);
+	ret->rpc=func_param;
+	ret->reply_ctx=(char*)ret+ROUND_POINTER(sizeof(*ret));
+	r_ctx=ret->reply_ctx;
+	r_ctx->flags=ctx->flags | JSONRPC_DELAYED_CTX_F;
+	r_ctx->transport=ctx->transport;
+	ctx->flags |= JSONRPC_DELAYED_REPLY_F;
+	r_ctx->msg=shm_msg;
+	r_ctx->msg_shm_block_size=len;
+
+	return ret;
+error:
+	if (shm_msg)
+		shm_free(shm_msg);
+	if (ret)
+		shm_free(ret);
 	return NULL;
 	return NULL;
 }
 }
 
 
@@ -918,21 +997,51 @@ static struct rpc_delayed_ctx* jsonrpc_delayed_ctx_new(jsonrpc_ctx_t* ctx)
  */
  */
 static void jsonrpc_delayed_ctx_close(struct rpc_delayed_ctx* dctx)
 static void jsonrpc_delayed_ctx_close(struct rpc_delayed_ctx* dctx)
 {
 {
-	return;
-}
+	jsonrpc_ctx_t* r_ctx;
+	hdr_field_t* hdr;
 
 
+	r_ctx=dctx->reply_ctx;
+	if (unlikely(!(r_ctx->flags & JSONRPC_DELAYED_CTX_F))){
+		BUG("reply ctx not marked as async/delayed\n");
+		goto error;
+	}
 
 
-static void jsonrpc_clean_context(jsonrpc_ctx_t* ctx)
-{
-	if (!ctx) return;
-	srjson_DeleteDoc(ctx->jreq);
-	if(ctx->rpl_node!=NULL) {
-		srjson_Delete(ctx->jrpl, ctx->rpl_node);
-		ctx->rpl_node = NULL;
+	if (jsonrpc_delayed_reply_ctx_init(r_ctx)<0)
+		goto error;
+
+	if (!r_ctx->reply_sent){
+		jsonrpc_send(r_ctx);
 	}
 	}
-	srjson_DeleteDoc(ctx->jrpl);
+error:
+	jsonrpc_clean_context(r_ctx);
+	if(r_ctx->msg) {
+		/* free added lumps (rpc_send adds a body lump) */
+		del_nonshm_lump( &(r_ctx->msg->add_rm) );
+		del_nonshm_lump( &(r_ctx->msg->body_lumps) );
+		del_nonshm_lump_rpl( &(r_ctx->msg->reply_lump) );
+		/* free header's parsed structures
+		 * that were added by failure handlers */
+		for( hdr=r_ctx->msg->headers ; hdr ; hdr=hdr->next ) {
+			if ( hdr->parsed && hdr_allocs_parse(hdr) &&
+					(hdr->parsed<(void*)r_ctx->msg ||
+					hdr->parsed>=(void*)(r_ctx->msg+r_ctx->msg_shm_block_size))) {
+				/* header parsed filed doesn't point inside uas.request memory
+				 * chunck -> it was added by failure funcs.-> free it as pkg */
+				DBG("removing hdr->parsed %d\n", hdr->type);
+				clean_hdr_field(hdr);
+				hdr->parsed = 0;
+			}
+		}
+		shm_free(r_ctx->msg);
+	}
+	r_ctx->msg=0;
+	dctx->reply_ctx=0;
+	shm_free(dctx);
+
+	return;
 }
 }
 
 
+
 static int mod_init(void)
 static int mod_init(void)
 {
 {
 
 
@@ -1027,6 +1136,9 @@ static int child_init(int rank)
 	return 0;
 	return 0;
 }
 }
 
 
+/**
+ *
+ */
 static void mod_destroy(void)
 static void mod_destroy(void)
 {
 {
 	jsonrpc_fifo_destroy();
 	jsonrpc_fifo_destroy();
@@ -1035,6 +1147,9 @@ static void mod_destroy(void)
 	return;
 	return;
 }
 }
 
 
+/**
+ *
+ */
 static int jsonrpc_dispatch(sip_msg_t* msg, char* s1, char* s2)
 static int jsonrpc_dispatch(sip_msg_t* msg, char* s1, char* s2)
 {
 {
 	rpc_export_t* rpce;
 	rpc_export_t* rpce;
@@ -1067,6 +1182,7 @@ static int jsonrpc_dispatch(sip_msg_t* msg, char* s1, char* s2)
 		LM_ERR("invalid json doc [[%s]]\n", ctx->jreq->buf.s);
 		LM_ERR("invalid json doc [[%s]]\n", ctx->jreq->buf.s);
 		return NONSIP_MSG_ERROR;
 		return NONSIP_MSG_ERROR;
 	}
 	}
+	ctx->transport = JSONRPC_TRANS_HTTP;
 	if (jsonrpc_init_reply(ctx) < 0) goto send_reply;
 	if (jsonrpc_init_reply(ctx) < 0) goto send_reply;
 
 
 	/* sanity checks on jsonrpc request */
 	/* sanity checks on jsonrpc request */
@@ -1106,7 +1222,7 @@ static int jsonrpc_dispatch(sip_msg_t* msg, char* s1, char* s2)
 	rpce->function(&func_param, ctx);
 	rpce->function(&func_param, ctx);
 
 
 send_reply:
 send_reply:
-	if (!ctx->reply_sent) {
+	if (!ctx->reply_sent && !(ctx->flags&JSONRPC_DELAYED_REPLY_F)) {
 		ret = jsonrpc_send(ctx);
 		ret = jsonrpc_send(ctx);
 	}
 	}
 	jsonrpc_clean_context(ctx);
 	jsonrpc_clean_context(ctx);

+ 13 - 0
src/modules/jsonrpcs/jsonrpcs_mod.h

@@ -40,6 +40,8 @@
  */
  */
 typedef struct jsonrpc_ctx {
 typedef struct jsonrpc_ctx {
 	sip_msg_t* msg;        /**< The SIP/HTTP received message. */
 	sip_msg_t* msg;        /**< The SIP/HTTP received message. */
+	int msg_shm_block_size; /**< non-zero for delayed reply contexts with
+							  shm cloned msgs */
 	char* method;          /**< Name of the management function to be called */
 	char* method;          /**< Name of the management function to be called */
 	unsigned int flags;    /**< Various flags, such as return value type */
 	unsigned int flags;    /**< Various flags, such as return value type */
 	srjson_doc_t *jreq;    /**< JSON request document */
 	srjson_doc_t *jreq;    /**< JSON request document */
@@ -50,8 +52,19 @@ typedef struct jsonrpc_ctx {
 	int error_code;        /**< Json error code */
 	int error_code;        /**< Json error code */
 	int http_code;         /**< http reply code */
 	int http_code;         /**< http reply code */
 	str http_text;         /**< http reply reason text */
 	str http_text;         /**< http reply reason text */
+	int transport;         /**< RPC transport */
 } jsonrpc_ctx_t;
 } jsonrpc_ctx_t;
 
 
+/* extra rpc_ctx_t flags */
+/* first 8 bits reserved for rpc flags (e.g. RET_ARRAY) */
+#define JSONRPC_DELAYED_CTX_F	256
+#define JSONRPC_DELAYED_REPLY_F	512
+
+#define JSONRPC_TRANS_NONE	0
+#define JSONRPC_TRANS_HTTP	1
+#define JSONRPC_TRANS_FIFO	2
+#define JSONRPC_TRANS_DGRAM	3
+
 typedef struct jsonrpc_plain_reply {
 typedef struct jsonrpc_plain_reply {
 	int rcode;         /**< reply code */
 	int rcode;         /**< reply code */
 	str rtext;         /**< reply reason text */
 	str rtext;         /**< reply reason text */