浏览代码

rabbitmq: don't create reply-to queue on publish

When using the rabbitmq_publish function, there is no need to create
a reply to queue, because it will never be read. And since there is
never a real consumer, so the queue will never be deleted. This
will eventually cloak up the RabbitMQ server with millions of
generic reply queues.
This bug has been fixed in master already, so this is basically a
backport.
Sebastian Damm 8 年之前
父节点
当前提交
9abd1e002e
共有 1 个文件被更改,包括 0 次插入35 次删除
  1. 0 35
      src/modules/rabbitmq/rabbitmq.c

+ 0 - 35
src/modules/rabbitmq/rabbitmq.c

@@ -175,7 +175,6 @@ static int rabbitmq_publish(struct sip_msg* msg, char* in_exchange, char* in_rou
 	int reconnect_attempts = 0;
 	int reconnect_attempts = 0;
 	int log_ret;
 	int log_ret;
 	str exchange, routingkey, messagebody, contenttype;
 	str exchange, routingkey, messagebody, contenttype;
-	amqp_bytes_t reply_to_queue;
 
 
 	// sanity checks
 	// sanity checks
 	if (get_str_fparam(&exchange, msg, (fparam_t*)in_exchange) < 0) {
 	if (get_str_fparam(&exchange, msg, (fparam_t*)in_exchange) < 0) {
@@ -231,44 +230,13 @@ reconnect:
 		return RABBITMQ_ERR_CHANNEL;
 		return RABBITMQ_ERR_CHANNEL;
 	}
 	}
 
 
-	// alloc queue
-	amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1, amqp_empty_table);
-	if (log_on_amqp_error(amqp_get_rpc_reply(conn), "amqp_queue_declare()") != AMQP_RESPONSE_NORMAL) {
-		LM_ERR("FAIL: amqp_queue_declare()\n");
-		amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
-		return RABBITMQ_ERR_QUEUE;
-	}
-
-	// alloc bytes
-	reply_to_queue = amqp_bytes_malloc_dup(r->queue);
-	LM_DBG("%.*s\n", (int)reply_to_queue.len, (char*)reply_to_queue.bytes);
-	if (reply_to_queue.bytes == NULL) {
-		amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
-		amqp_bytes_free(reply_to_queue);
-		LM_ERR("Out of memory while copying queue name");
-		return -1;
-	}
-
 	// alloc properties
 	// alloc properties
 	amqp_basic_properties_t props;
 	amqp_basic_properties_t props;
 	props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG |
 	props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG |
 			AMQP_BASIC_DELIVERY_MODE_FLAG |
 			AMQP_BASIC_DELIVERY_MODE_FLAG |
-			AMQP_BASIC_REPLY_TO_FLAG |
 			AMQP_BASIC_CORRELATION_ID_FLAG;
 			AMQP_BASIC_CORRELATION_ID_FLAG;
 	props.content_type = amqp_cstring_bytes(contenttype.s);
 	props.content_type = amqp_cstring_bytes(contenttype.s);
 	props.delivery_mode = 2; /* persistent delivery mode */
 	props.delivery_mode = 2; /* persistent delivery mode */
-	props.reply_to = amqp_bytes_malloc_dup(reply_to_queue);
-	if (props.reply_to.bytes == NULL) {
-		// debug
-		LM_ERR("Out of memory while copying queue name");
-
-		// cleanup
-		amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
-		amqp_bytes_free(reply_to_queue);
-
-		// error
-		return -1;
-	}
 	props.correlation_id = amqp_cstring_bytes("1");
 	props.correlation_id = amqp_cstring_bytes("1");
 
 
 	// publish
 	// publish
@@ -285,7 +253,6 @@ reconnect:
 
 
 			// cleanup
 			// cleanup
 			amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
 			amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
-			amqp_bytes_free(reply_to_queue);
 
 
 			// error
 			// error
 			return RABBITMQ_ERR_PUBLISH;
 			return RABBITMQ_ERR_PUBLISH;
@@ -295,8 +262,6 @@ reconnect:
 	LM_DBG("SUCCESS: amqp_basic_publish()\n");
 	LM_DBG("SUCCESS: amqp_basic_publish()\n");
 
 
 	// cleanup
 	// cleanup
-	amqp_bytes_free(props.reply_to);
-	amqp_bytes_free(reply_to_queue);
 	amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
 	amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
 
 
 	// success
 	// success