Pārlūkot izejas kodu

rabbitmq: functions exported to kemi framework

Daniel-Constantin Mierla 7 gadi atpakaļ
vecāks
revīzija
97b1fa994d
1 mainītis faili ar 124 papildinājumiem un 57 dzēšanām
  1. 124 57
      src/modules/rabbitmq/rabbitmq.c

+ 124 - 57
src/modules/rabbitmq/rabbitmq.c

@@ -44,6 +44,7 @@
 #include "../../core/mod_fix.h"
 #include "../../core/pvar.h"
 #include "../../core/lvalue.h"
+#include "../../core/kemi.h"
 #include <stdio.h>
 #include <stdlib.h>
 #include <stdio.h>
@@ -124,7 +125,8 @@ static cmd_export_t cmds[] = {
 };
 
 /* module parameters */
-static param_export_t params[] = {{"url", PARAM_STRING, &amqp_url},
+static param_export_t params[] = {
+	{"url", PARAM_STRING, &amqp_url},
 	{"timeout_sec", PARAM_INT, &timeout_sec},
 	{"timeout_usec", PARAM_INT, &timeout_usec},
 	{"direct_reply_to", PARAM_INT, &direct_reply_to},
@@ -178,33 +180,11 @@ static int mod_child_init(int rank)
 }
 
 /* module helper functions */
-static int rabbitmq_publish(struct sip_msg *msg, char *in_exchange,
-		char *in_routingkey, char *in_contenttype, char *in_messagebody)
+static int ki_rabbitmq_publish(sip_msg_t *msg, str *exchange, str *routingkey,
+	str *contenttype, str *messagebody)
 {
 	int reconnect_attempts = 0;
 	int log_ret;
-	str exchange, routingkey, messagebody, contenttype;
-
-	// sanity checks
-	if(get_str_fparam(&exchange, msg, (fparam_t *)in_exchange) < 0) {
-		LM_ERR("failed to get exchange\n");
-		return -1;
-	}
-
-	if(get_str_fparam(&routingkey, msg, (fparam_t *)in_routingkey) < 0) {
-		LM_ERR("failed to get kouting key\n");
-		return -1;
-	}
-
-	if(get_str_fparam(&messagebody, msg, (fparam_t *)in_messagebody) < 0) {
-		LM_ERR("failed to get message body\n");
-		return -1;
-	}
-
-	if(get_str_fparam(&contenttype, msg, (fparam_t *)in_contenttype) < 0) {
-		LM_ERR("failed to get content type\n");
-		return -1;
-	}
 
 reconnect:
 	// open channel
@@ -244,14 +224,14 @@ reconnect:
 	amqp_basic_properties_t props;
 	props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG
 				   | AMQP_BASIC_CORRELATION_ID_FLAG;
-	props.content_type = amqp_cstring_bytes(contenttype.s);
+	props.content_type = amqp_cstring_bytes(contenttype->s);
 	props.delivery_mode = 2; /* persistent delivery mode */
 	props.correlation_id = amqp_cstring_bytes("1");
 
 	// publish
-	if(log_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes(exchange.s),
-							amqp_cstring_bytes(routingkey.s), 0, 0, &props,
-							amqp_cstring_bytes(messagebody.s)),
+	if(log_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes(exchange->s),
+							amqp_cstring_bytes(routingkey->s), 0, 0, &props,
+							amqp_cstring_bytes(messagebody->s)),
 			   "amqp_basic_publish()")
 			!= AMQP_RESPONSE_NORMAL) {
 		// debug
@@ -274,31 +254,11 @@ reconnect:
 	return RABBITMQ_OK;
 }
 
-static int rabbitmq_publish_consume(struct sip_msg *msg, char *in_exchange,
-		char *in_routingkey, char *in_contenttype, char *in_messagebody,
-		char *reply)
+/* module helper functions */
+static int rabbitmq_publish(struct sip_msg *msg, char *in_exchange,
+		char *in_routingkey, char *in_contenttype, char *in_messagebody)
 {
-	pv_spec_t *dst;
-	pv_value_t val;
 	str exchange, routingkey, messagebody, contenttype;
-	amqp_frame_t frame;
-	amqp_basic_deliver_t *d;
-	amqp_basic_properties_t *p;
-	int result = RABBITMQ_OK;
-	int reconnect_attempts = 0;
-	int log_ret;
-	size_t body_target;
-	size_t body_received;
-
-	struct timeval tv;
-	tv.tv_sec = timeout_sec;
-	tv.tv_usec = timeout_usec;
-
-	amqp_queue_declare_ok_t *reply_to;
-
-	uuid_t uuid;
-	char uuid_buffer[40];
-	char reply_to_buffer[64];
 
 	// sanity checks
 	if(get_str_fparam(&exchange, msg, (fparam_t *)in_exchange) < 0) {
@@ -321,6 +281,34 @@ static int rabbitmq_publish_consume(struct sip_msg *msg, char *in_exchange,
 		return -1;
 	}
 
+	return ki_rabbitmq_publish(msg, &exchange, &routingkey, &contenttype,
+			&messagebody);
+}
+
+static int rabbitmq_publish_consume_helper(sip_msg_t *msg, str *exchange,
+		str *routingkey, str *contenttype, str *messagebody,
+		pv_spec_t *dst)
+{
+	pv_value_t val;
+	amqp_frame_t frame;
+	amqp_basic_deliver_t *d;
+	amqp_basic_properties_t *p;
+	int result = RABBITMQ_OK;
+	int reconnect_attempts = 0;
+	int log_ret;
+	size_t body_target;
+	size_t body_received;
+
+	struct timeval tv;
+	tv.tv_sec = timeout_sec;
+	tv.tv_usec = timeout_usec;
+
+	amqp_queue_declare_ok_t *reply_to;
+
+	uuid_t uuid;
+	char uuid_buffer[40];
+	char reply_to_buffer[64];
+
 reconnect:
 	// open channel
 	amqp_channel_open(conn, 1);
@@ -386,7 +374,7 @@ reconnect:
 	amqp_basic_properties_t props;
 	props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG
 				   | AMQP_BASIC_REPLY_TO_FLAG | AMQP_BASIC_CORRELATION_ID_FLAG;
-	props.content_type = amqp_cstring_bytes(contenttype.s);
+	props.content_type = amqp_cstring_bytes(contenttype->s);
 	props.delivery_mode = 2; /* persistent delivery mode */
 	props.reply_to = reply_to->queue;
 	if(props.reply_to.bytes == NULL) {
@@ -407,9 +395,9 @@ reconnect:
 	}
 
 	// publish
-	if(log_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes(exchange.s),
-							amqp_cstring_bytes(routingkey.s), 0, 0, &props,
-							amqp_cstring_bytes(messagebody.s)),
+	if(log_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes(exchange->s),
+							amqp_cstring_bytes(routingkey->s), 0, 0, &props,
+							amqp_cstring_bytes(messagebody->s)),
 			   "amqp_basic_publish()")
 			!= AMQP_RESPONSE_NORMAL) {
 		LM_ERR("FAIL: amqp_basic_publish()\n");
@@ -493,7 +481,6 @@ reconnect:
 
 			LM_DBG("RPC Call result: %.*s\n", val.rs.len, val.rs.s);
 			val.flags = PV_VAL_STR;
-			dst = (pv_spec_t *)reply;
 			dst->setf(msg, &dst->pvp, (int)EQ_T, &val);
 		}
 
@@ -513,6 +500,60 @@ reconnect:
 	return result;
 }
 
+static int ki_rabbitmq_publish_consume(sip_msg_t *msg, str *exchange,
+		str *routingkey, str *contenttype, str *messagebody, str *dpv)
+{
+	pv_spec_t *dst;
+
+	dst = pv_cache_get(dpv);
+
+	if(dpv==NULL) {
+		LM_ERR("failed getting pv: %.*s\n", dpv->len, dpv->s);
+		return -1;
+	}
+	if(dst->setf == NULL) {
+		LM_ERR("result pvar is not writeble: %.*s\n", dpv->len, dpv->s);
+		return -1;
+	}
+
+	return rabbitmq_publish_consume_helper(msg, exchange, routingkey,
+			contenttype, messagebody, dst);
+}
+
+static int rabbitmq_publish_consume(struct sip_msg *msg, char *in_exchange,
+		char *in_routingkey, char *in_contenttype, char *in_messagebody,
+		char *reply)
+{
+	str exchange, routingkey, messagebody, contenttype;
+	pv_spec_t *dst;
+
+	// sanity checks
+	if(get_str_fparam(&exchange, msg, (fparam_t *)in_exchange) < 0) {
+		LM_ERR("failed to get exchange\n");
+		return -1;
+	}
+
+	if(get_str_fparam(&routingkey, msg, (fparam_t *)in_routingkey) < 0) {
+		LM_ERR("failed to get kouting key\n");
+		return -1;
+	}
+
+	if(get_str_fparam(&messagebody, msg, (fparam_t *)in_messagebody) < 0) {
+		LM_ERR("failed to get message body\n");
+		return -1;
+	}
+
+	if(get_str_fparam(&contenttype, msg, (fparam_t *)in_contenttype) < 0) {
+		LM_ERR("failed to get content type\n");
+		return -1;
+	}
+
+	dst = (pv_spec_t *)reply;
+
+	return rabbitmq_publish_consume_helper(msg, &exchange, &routingkey,
+			&contenttype, &messagebody, dst);
+}
+
 static int rabbitmq_connect(amqp_connection_state_t *conn)
 {
 	int ret;
@@ -602,4 +643,30 @@ static int rabbitmq_reconnect(amqp_connection_state_t *conn)
 	}
 
 	return RABBITMQ_OK;
+}
+
+/**
+ *
+ */
+/* clang-format off */
+static sr_kemi_t sr_kemi_rabbitmq_exports[] = {
+	{ str_init("rabbitmq"), str_init("publish"),
+		SR_KEMIP_INT, ki_rabbitmq_publish,
+		{ SR_KEMIP_STR, SR_KEMIP_STR, SR_KEMIP_STR,
+			SR_KEMIP_STR, SR_KEMIP_NONE, SR_KEMIP_NONE }
+	},
+	{ str_init("rabbitmq"), str_init("publish_consume"),
+		SR_KEMIP_INT, ki_rabbitmq_publish_consume,
+		{ SR_KEMIP_STR, SR_KEMIP_STR, SR_KEMIP_STR,
+			SR_KEMIP_STR, SR_KEMIP_STR, SR_KEMIP_NONE }
+	},
+
+	{ {0, 0}, {0, 0}, 0, NULL, { 0, 0, 0, 0, 0, 0 } }
+};
+/* clang-format on */
+
+int mod_register(char *path, int *dlflags, void *p1, void *p2)
+{
+	sr_kemi_modules_add(sr_kemi_rabbitmq_exports);
+	return 0;
 }