소스 검색

rabbitmq: Adding amqps support (#3511)

* rabbitmq: Adding amqps support

Adding support for secure AMQP connections over TLS (amqps).

* rabbitmq: Adding amqps support

Adding support for secure AMQP connections over TLS (amqps).

rabbitmq: Adding amqps support

Adding support for secure AMQP connections over TLS (amqps).

rabbitmq: Format fixes

Some style format fixes

* rabbitmq: Format fixes

Fixing some missing spaces

* rabbitmq: Typo fix

Fixing inilialized by initialized
joelbax 2 년 전
부모
커밋
78c0275b90
2개의 변경된 파일35개의 추가작업 그리고 2개의 파일을 삭제
  1. 33 2
      src/modules/rabbitmq/rabbitmq.c
  2. 2 0
      src/modules/rabbitmq/rabbitmq.h

+ 33 - 2
src/modules/rabbitmq/rabbitmq.c

@@ -52,6 +52,7 @@
 
 #include <stdint.h>
 #include <amqp_tcp_socket.h>
+#include <amqp_ssl_socket.h>
 #include <amqp.h>
 #include <amqp_framing.h>
 
@@ -73,10 +74,12 @@ static amqp_connection_state_t amqp_conn = NULL;
 /* module parameters */
 static struct amqp_connection_info amqp_info;
 static char *amqp_url = RABBITMQ_DEFAULT_AMQP_URL;
+static char *rmq_amqps_ca_file = NULL;
 static int max_reconnect_attempts = 1;
 static int timeout_sec = 1;
 static int timeout_usec = 0;
 static int direct_reply_to = 0;
+static int amqp_ssl_init_called = 0;
 
 /* module helper functions */
 static int rabbitmq_connect(amqp_connection_state_t *conn);
@@ -124,6 +127,7 @@ static cmd_export_t cmds[] = {
 
 /* module parameters */
 static param_export_t params[] = {{"url", PARAM_STRING, &amqp_url},
+		{"amqps_ca_file", PARAM_STRING, &rmq_amqps_ca_file},
 		{"timeout_sec", PARAM_INT, &timeout_sec},
 		{"timeout_usec", PARAM_INT, &timeout_usec},
 		{"direct_reply_to", PARAM_INT, &direct_reply_to}, {0, 0, 0}};
@@ -557,25 +561,52 @@ static int rabbitmq_connect(amqp_connection_state_t *conn)
 	int ret;
 	int log_ret;
 	//	amqp_rpc_reply_t reply;
+	
+	// amqp_ssl_init_called should only be called once
+	if(amqp_info.ssl && !amqp_ssl_init_called) {
+		amqp_set_initialize_ssl_library(1);
+		amqp_ssl_init_called = 1;
+		LM_DBG("AMQP SSL library initialized\n");
+	}
 
 	// establish a new connection to RabbitMQ server
 	*conn = amqp_new_connection();
+	if(!conn) {
+		LM_ERR("FAIL: create AMQP connection\n");
+		return RABBITMQ_ERR_CREATE;
+	}
 	log_ret = log_on_amqp_error(
 			amqp_get_rpc_reply(*conn), "amqp_new_connection()");
 	if(log_ret != AMQP_RESPONSE_NORMAL && log_ret != AMQP_RESPONSE_NONE) {
 		return RABBITMQ_ERR_CONNECT;
 	}
 
-	amqp_sock = amqp_tcp_socket_new(*conn);
+	amqp_sock = (amqp_info.ssl) ? amqp_ssl_socket_new(*conn)
+								: amqp_tcp_socket_new(*conn);
 	if(!amqp_sock) {
 		LM_ERR("FAIL: create TCP amqp_sock");
 		amqp_destroy_connection(*conn);
 		return RABBITMQ_ERR_SOCK;
 	}
 
+	if(rmq_amqps_ca_file) {
+		if(amqp_ssl_socket_set_cacert(amqp_sock, rmq_amqps_ca_file)) {
+			LM_ERR("Failed to set CA certificate for amqps connection\n");
+			return RABBITMQ_ERR_SSL_CACERT;
+		}
+	}
+
+#if AMQP_VERSION_MAJOR == 0 && AMQP_VERSION_MINOR < 8
+	amqp_ssl_socket_set_verify(amqp_sock, 1);
+#else
+	amqp_ssl_socket_set_verify_peer(amqp_sock, 1);
+	amqp_ssl_socket_set_verify_hostname(amqp_sock, 1);
+#endif
+
 	ret = amqp_socket_open(amqp_sock, amqp_info.host, amqp_info.port);
 	if(ret != AMQP_STATUS_OK) {
-		LM_ERR("FAIL: open TCP sock, amqp_status=%d", ret);
+		LM_ERR("FAIL: open %s sock, amqp_status=%d",
+				(amqp_info.ssl) ? "SSL" : "TCP", ret);
 		// amqp_destroy_connection(*conn);
 		return RABBITMQ_ERR_SOCK;
 	}

+ 2 - 0
src/modules/rabbitmq/rabbitmq.h

@@ -49,8 +49,10 @@ typedef enum
 	RABBITMQ_ERR_CHANNEL,
 	RABBITMQ_ERR_QUEUE,
 	RABBITMQ_ERR_PUBLISH,
+	RABBITMQ_ERR_CREATE,
 	RABBITMQ_ERR_SOCK,
 	RABBITMQ_ERR_CONSUME,
+	RABBITMQ_ERR_SSL_CACERT,
 	RABBITMQ_ERR_NULL,
 } RABBITMQ_ENUM;