Ver Fonte

kazoo: add amqps connection support

lazedo há 9 anos atrás
pai
commit
97cd5d3188
2 ficheiros alterados com 106 adições e 33 exclusões
  1. 11 0
      modules/kazoo/kazoo.c
  2. 95 33
      modules/kazoo/kz_amqp.c

+ 11 - 0
modules/kazoo/kazoo.c

@@ -109,6 +109,12 @@ db_func_t kz_pa_dbf;
 str kz_presentity_table = str_init("presentity");
 str kz_db_url = {0,0};
 
+str kz_amqps_ca_cert = {0,0};
+str kz_amqps_cert = {0,0};
+str kz_amqps_key = {0,0};
+int kz_amqps_verify_peer = 1;
+int kz_amqps_verify_hostname = 1;
+
 str kz_query_timeout_avp = {0,0};
 pv_spec_t kz_query_timeout_spec;
 
@@ -201,6 +207,11 @@ static param_export_t params[] = {
     {"amqp_primary_zone", STR_PARAM, &dbk_primary_zone_name.s},
     {"amqp_command_hashtable_size", INT_PARAM, &dbk_command_table_size},
     {"amqp_result_avp", STR_PARAM, &kz_query_result_avp.s},
+    {"amqps_ca_cert", STR_PARAM, &kz_amqps_ca_cert.s},
+    {"amqps_cert", STR_PARAM, &kz_amqps_cert.s},
+    {"amqps_key", STR_PARAM, &kz_amqps_key.s},
+    {"amqps_verify_peer", INT_PARAM, &kz_amqps_verify_peer},
+    {"amqps_verify_hostname", INT_PARAM, &kz_amqps_verify_hostname},
     {0, 0, 0}
 };
 

+ 95 - 33
modules/kazoo/kz_amqp.c

@@ -4,6 +4,7 @@
 #include <amqp.h>
 #include <amqp_framing.h>
 #include <amqp_tcp_socket.h>
+#include <amqp_ssl_socket.h>
 #include <json.h>
 #include <uuid/uuid.h>
 #include "../../mem/mem.h"
@@ -39,6 +40,12 @@ extern struct timeval kz_amqp_tv;
 extern struct timeval kz_qtimeout_tv;
 extern struct timeval kz_timer_tv;
 
+extern str kz_amqps_ca_cert;
+extern str kz_amqps_cert;
+extern str kz_amqps_key;
+extern int kz_amqps_verify_peer;
+extern int kz_amqps_verify_hostname;
+
 extern pv_spec_t kz_query_timeout_spec;
 
 const amqp_bytes_t kz_amqp_empty_bytes = { 0, NULL };
@@ -727,8 +734,88 @@ void kz_amqp_channel_close(kz_amqp_conn_ptr rmq, amqp_channel_t channel) {
 	kz_amqp_error("closing channel", amqp_channel_close(rmq->conn, channel, AMQP_REPLY_SUCCESS));
 }
 
+int kz_ssl_initialized = 0;
+
+int kz_amqp_connection_open_ssl(kz_amqp_conn_ptr rmq) {
+
+    if(!kz_ssl_initialized) {
+	    kz_ssl_initialized = 1;
+	    amqp_set_initialize_ssl_library(1);
+    }
+
+    if (!(rmq->conn = amqp_new_connection())) {
+	LM_ERR("Failed to create new AMQP connection\n");
+	goto error;
+    }
+
+    rmq->socket = amqp_ssl_socket_new(rmq->conn);
+    if (!rmq->socket) {
+	LM_ERR("Failed to create SSL socket to AMQP broker\n");
+	goto error;
+    }
+
+    if (kz_amqps_ca_cert.s) {
+      if (amqp_ssl_socket_set_cacert(rmq->socket, kz_amqps_ca_cert.s)) {
+	LM_ERR("Failed to set CA certificate for amqps connection\n");
+	goto nosocket;
+      }
+    }
+
+    if (kz_amqps_cert.s && kz_amqps_key.s) {
+      if (amqp_ssl_socket_set_key(rmq->socket, kz_amqps_cert.s, kz_amqps_key.s)) {
+	LM_ERR("Failed to set client key/certificate for amqps connection\n");
+	goto nosocket;
+      }
+    }
+
+#if AMQP_VERSION_MAJOR == 0 && AMQP_VERSION_MINOR < 8
+    amqp_ssl_socket_set_verify(rmq->socket, kz_amqps_verify_peer | kz_amqps_verify_hostname);
+#else
+    amqp_ssl_socket_set_verify_peer(rmq->socket, kz_amqps_verify_peer);
+    amqp_ssl_socket_set_verify_hostname(rmq->socket, kz_amqps_verify_hostname);
+#endif
+
+    if (amqp_socket_open(rmq->socket, rmq->server->connection->info.host, rmq->server->connection->info.port)) {
+	LM_ERR("Failed to open SSL socket to AMQP broker : %s : %i\n",
+	rmq->server->connection->info.host, rmq->server->connection->info.port);
+	goto nosocket;
+    }
+
+    if (kz_amqp_error("Logging in", amqp_login(rmq->conn,
+					   rmq->server->connection->info.vhost,
+					   0,
+					   131072,
+					   dbk_use_hearbeats,
+					   AMQP_SASL_METHOD_PLAIN,
+					   rmq->server->connection->info.user,
+					   rmq->server->connection->info.password))) {
+
+	LM_ERR("Login to AMQP broker failed!\n");
+	goto error;
+    }
+
+	rmq->state = KZ_AMQP_CONNECTION_OPEN;
+	return 0;
+
+nosocket:
+    if (amqp_destroy_connection(rmq->conn) < 0) {
+	LM_ERR("cannot destroy connection\n");
+    }
+    return -1;
+
+ error:
+    kz_amqp_connection_close(rmq);
+    return -1;
+}
+
 int kz_amqp_connection_open(kz_amqp_conn_ptr rmq) {
+	rmq->state = KZ_AMQP_CONNECTION_CLOSED;
 	rmq->channel_count = rmq->channel_counter = 0;
+
+	if(rmq->server->connection->info.ssl)
+		return kz_amqp_connection_open_ssl(rmq);
+
+    rmq->channel_count = rmq->channel_counter = 0;
     if (!(rmq->conn = amqp_new_connection())) {
     	LM_DBG("Failed to create new AMQP connection\n");
     	goto error;
@@ -758,7 +845,8 @@ int kz_amqp_connection_open(kz_amqp_conn_ptr rmq) {
     	goto error;
     }
 
-    return 0;
+	rmq->state = KZ_AMQP_CONNECTION_OPEN;
+	return 0;
 
  error:
     kz_amqp_connection_close(rmq);
@@ -2118,37 +2206,10 @@ int kz_amqp_connect(kz_amqp_conn_ptr rmq)
 	if(rmq->state != KZ_AMQP_CONNECTION_CLOSED) {
 		kz_amqp_connection_close(rmq);
 	}
-	rmq->state = KZ_AMQP_CONNECTION_CLOSED;
-	rmq->channel_count = rmq->channel_counter = 0;
-    if (!(rmq->conn = amqp_new_connection())) {
-    	LM_DBG("Failed to create new AMQP connection\n");
-    	goto error;
-    }
 
-    rmq->socket = amqp_tcp_socket_new(rmq->conn);
-    if (!rmq->socket) {
-    	LM_DBG("Failed to create TCP socket to AMQP broker\n");
-    	goto error;
-    }
-
-    if (amqp_socket_open(rmq->socket, rmq->server->connection->info.host, rmq->server->connection->info.port)) {
-    	LM_DBG("Failed to open TCP socket to AMQP broker\n");
-    	goto error;
-    }
-
-    if (kz_amqp_error("Logging in", amqp_login(rmq->conn,
-					   rmq->server->connection->info.vhost,
-					   0,
-					   131072,
-					   dbk_use_hearbeats,
-					   AMQP_SASL_METHOD_PLAIN,
-					   rmq->server->connection->info.user,
-					   rmq->server->connection->info.password))) {
+	if(kz_amqp_connection_open(rmq) != 0)
+		goto error;
 
-    	LM_ERR("Login to AMQP broker failed!\n");
-    	goto error;
-    }
-	rmq->state = KZ_AMQP_CONNECTION_OPEN;
 	kz_amqp_fire_connection_event("open", rmq->server->connection->info.host);
 	for(i=0,channel_res=0; i < dbk_channels && channel_res == 0; i++) {
 			/* start cleanup */
@@ -2170,10 +2231,10 @@ int kz_amqp_connect(kz_amqp_conn_ptr rmq)
 
     return 0;
 
- error:
-    kz_amqp_connection_close(rmq);
-    kz_amqp_handle_server_failure(rmq);
+error:
+	kz_amqp_handle_server_failure(rmq);
 	return -1;
+
 }
 
 void kz_amqp_reconnect_cb(int fd, short event, void *arg)
@@ -2651,6 +2712,7 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr)
 					OK = kz_amqp_consume_error(consumer);
 					break;
 				default:
+					LM_ERR("AMQP_RESPONSE_LIBRARY_EXCEPTION %i\n", reply.library_error);
 					OK = 0;
 					break;
 				};