浏览代码

rabbitmq: Add direct_reply_to parameter

Stefan Mititelu 8 年之前
父节点
当前提交
1ad9eb9dee
共有 3 个文件被更改,包括 66 次插入34 次删除
  1. 21 4
      src/modules/rabbitmq/README
  2. 22 0
      src/modules/rabbitmq/doc/rabbitmq_admin.xml
  3. 23 30
      src/modules/rabbitmq/rabbitmq.c

+ 21 - 4
src/modules/rabbitmq/README

@@ -32,6 +32,7 @@ Stefan-Cristian Mititelu
               3.5. port (int)
               3.6. timeout_sec (int)
               3.7. timeout_usec (int)
+              3.8. direct_reply_to (int)
 
         4. Functions
 
@@ -50,8 +51,9 @@ Stefan-Cristian Mititelu
    1.5. Set the “port” parameter
    1.6. Set the “timeout_sec” parameter
    1.7. Set the “timeout_usec” parameter
-   1.8. rabbitmq_publish usage
-   1.9. rabbitmq_publish_consume usage
+   1.8. Set the “direct_reply_to” parameter
+   1.9. rabbitmq_publish usage
+   1.10. rabbitmq_publish_consume usage
 
 Chapter 1. Admin Guide
 
@@ -72,6 +74,7 @@ Chapter 1. Admin Guide
         3.5. port (int)
         3.6. timeout_sec (int)
         3.7. timeout_usec (int)
+        3.8. direct_reply_to (int)
 
    4. Functions
 
@@ -118,6 +121,7 @@ Chapter 1. Admin Guide
    3.5. port (int)
    3.6. timeout_sec (int)
    3.7. timeout_usec (int)
+   3.8. direct_reply_to (int)
 
 3.1. username (string)
 
@@ -200,6 +204,19 @@ modparam("rabbitmq", "timeout_sec", 1)
 modparam("rabbitmq", "timeout_usec", 0)
 ...
 
+3.8. direct_reply_to (int)
+
+   Setting this parameter to 1, enables rabbitmq direct reply-to feature.
+   More info about this, can be found at
+   https://www.rabbitmq.com/direct-reply-to.html.
+
+   Default value is “0”.
+
+   Example 1.8. Set the “direct_reply_to” parameter
+...
+modparam("rabbitmq", "direct_reply_to", 1)
+...
+
 4. Functions
 
    4.1. rabbitmq_publish(exchange, routing_key, content_type, messagebody)
@@ -219,7 +236,7 @@ modparam("rabbitmq", "timeout_usec", 0)
 
    This function can be used from REQUEST_ROUTE.
 
-   Example 1.8. rabbitmq_publish usage
+   Example 1.9. rabbitmq_publish usage
 rabbitmq_publish("exchange", "routing_key", "application/json", "$avp(json_reque
 st)");
 
@@ -239,6 +256,6 @@ messagebody, reply)
 
    This function can be used from REQUEST_ROUTE.
 
-   Example 1.9. rabbitmq_publish_consume usage
+   Example 1.10. rabbitmq_publish_consume usage
 rabbitmq_publish_consume("exchange", "routing_key", "application/json", "$avp(js
 on_request)", "$avp(json_reply)");

+ 22 - 0
src/modules/rabbitmq/doc/rabbitmq_admin.xml

@@ -222,6 +222,28 @@ modparam("rabbitmq", "timeout_sec", 1)
 				<programlisting format="linespecific">
 ...
 modparam("rabbitmq", "timeout_usec", 0)
+...
+				</programlisting>
+			</example>
+		</section>
+
+		<section id="rabbitmq.p.direct_reply_to">
+			<title><varname>direct_reply_to</varname> (int)</title>
+			<para>
+				Setting this parameter to 1, enables rabbitmq direct reply-to feature. More info about this, can be found at https://www.rabbitmq.com/direct-reply-to.html.
+			</para>
+
+			<para>
+				<emphasis>
+					Default value is <quote>0</quote>.
+				</emphasis>
+			</para>
+
+			<example>
+				<title>Set the <quote>direct_reply_to</quote> parameter</title>
+				<programlisting format="linespecific">
+...
+modparam("rabbitmq", "direct_reply_to", 1)
 ...
 				</programlisting>
 			</example>

+ 23 - 30
src/modules/rabbitmq/rabbitmq.c

@@ -77,6 +77,7 @@ int amqp_port = 5672;
 int max_reconnect_attempts = 1;
 int timeout_sec = 1;
 int timeout_usec = 0;
+int direct_reply_to = 0;
 
 /* module helper functions */
 static int rabbitmq_connect(amqp_connection_state_t *conn);
@@ -130,6 +131,7 @@ static param_export_t params[] = {
 	{"port", PARAM_INT, &amqp_port},
 	{"timeout_sec", PARAM_INT, &timeout_sec},
 	{"timeout_usec", PARAM_INT, &timeout_usec},
+	{"direct_reply_to", PARAM_INT, &direct_reply_to},
 	{ 0, 0, 0}
 };
 
@@ -285,6 +287,8 @@ static int rabbitmq_publish_consume(struct sip_msg* msg, char* in_exchange, char
 	tv.tv_sec=timeout_sec;
 	tv.tv_usec=timeout_usec;
 
+	amqp_queue_declare_ok_t *reply_to;
+
 	// sanity checks
 	if (get_str_fparam(&exchange, msg, (fparam_t*)in_exchange) < 0) {
 		LM_ERR("failed to get exchange\n");
@@ -306,9 +310,6 @@ static int rabbitmq_publish_consume(struct sip_msg* msg, char* in_exchange, char
 		return -1;
 	}
 
-
-	amqp_bytes_t reply_to_queue;
-
 reconnect:
 	// open channel
 	amqp_channel_open(conn, 1);
@@ -342,25 +343,22 @@ reconnect:
 		return RABBITMQ_ERR_CHANNEL;
 	}
 
-	// alloc queue
-	amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1, amqp_empty_table);
+	// alloc reply_to queue
+	if (direct_reply_to == 1) {
+		reply_to = amqp_queue_declare(conn, 1, amqp_cstring_bytes("amq.rabbitmq.reply-to"), 0, 0, 0, 1, amqp_empty_table);
+	} else {
+		reply_to = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1, amqp_empty_table);
+	}
+
 	if (log_on_amqp_error(amqp_get_rpc_reply(conn), "amqp_queue_declare()") != AMQP_RESPONSE_NORMAL) {
 		LM_ERR("FAIL: amqp_queue_declare()\n");
 		amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
 		return RABBITMQ_ERR_QUEUE;
 	}
 
-	// alloc bytes
-	reply_to_queue = amqp_bytes_malloc_dup(r->queue);
-	LM_DBG("%.*s\n", (int)reply_to_queue.len, (char*)reply_to_queue.bytes);
-	if (reply_to_queue.bytes == NULL) {
-		amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
-		amqp_bytes_free(reply_to_queue);
-		LM_ERR("Out of memory while copying queue name");
-		return -1;
-	}
+	LM_INFO("reply_to = %.*s\n", (int)reply_to->queue.len, (char*)reply_to->queue.bytes);
 
-	// alloc properties
+	// alloc request properties
 	amqp_basic_properties_t props;
 	props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG |
 			AMQP_BASIC_DELIVERY_MODE_FLAG |
@@ -368,15 +366,22 @@ reconnect:
 			AMQP_BASIC_CORRELATION_ID_FLAG;
 	props.content_type = amqp_cstring_bytes(contenttype.s);
 	props.delivery_mode = 2; /* persistent delivery mode */
-	props.reply_to = amqp_bytes_malloc_dup(reply_to_queue);
+	props.reply_to = reply_to->queue;
 	if (props.reply_to.bytes == NULL) {
-		amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
-		amqp_bytes_free(reply_to_queue);
 		LM_ERR("Out of memory while copying queue name");
+		amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
 		return -1;
 	}
 	props.correlation_id = amqp_cstring_bytes("1");
 
+	// start consume
+	amqp_basic_consume(conn, 1, reply_to->queue, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
+	if (log_on_amqp_error(amqp_get_rpc_reply(conn), "amqp_basic_consume()") != AMQP_RESPONSE_NORMAL) {
+		LM_ERR("FAIL: amqp_basic_consume()\n");
+		amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
+		return RABBITMQ_ERR_CONSUME;
+	}
+
 	// publish
 	if (log_on_error(amqp_basic_publish(conn,1,
 		amqp_cstring_bytes(exchange.s),
@@ -388,20 +393,8 @@ reconnect:
 		"amqp_basic_publish()") != AMQP_RESPONSE_NORMAL) {
 			LM_ERR("FAIL: amqp_basic_publish()\n");
 			amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
-			amqp_bytes_free(reply_to_queue);
 			return RABBITMQ_ERR_PUBLISH;
 	}
-	amqp_bytes_free(props.reply_to);
-
-	// consume
-	amqp_basic_consume(conn, 1, reply_to_queue, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
-	if (log_on_amqp_error(amqp_get_rpc_reply(conn), "amqp_basic_consume()") != AMQP_RESPONSE_NORMAL) {
-		LM_ERR("FAIL: amqp_basic_consume()\n");
-		amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
-		amqp_bytes_free(reply_to_queue);
-		return RABBITMQ_ERR_CONSUME;
-	}
-	amqp_bytes_free(reply_to_queue);
 
 	// consume frame
 	for (;;) {