|
@@ -127,6 +127,17 @@ void kz_amqp_free_bind(kz_amqp_bind_ptr bind)
|
|
shm_free(bind);
|
|
shm_free(bind);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+void kz_amqp_free_connection(kz_amqp_conn_ptr conn)
|
|
|
|
+{
|
|
|
|
+ if(!conn)
|
|
|
|
+ return;
|
|
|
|
+
|
|
|
|
+ if(conn->url)
|
|
|
|
+ shm_free(conn->url);
|
|
|
|
+ shm_free(conn);
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+
|
|
void kz_amqp_free_pipe_cmd(kz_amqp_cmd_ptr cmd)
|
|
void kz_amqp_free_pipe_cmd(kz_amqp_cmd_ptr cmd)
|
|
{
|
|
{
|
|
if(cmd == NULL)
|
|
if(cmd == NULL)
|
|
@@ -269,9 +280,9 @@ void kz_amqp_destroy() {
|
|
if(kz_pool != NULL) {
|
|
if(kz_pool != NULL) {
|
|
kz_amqp_conn_ptr conn = kz_pool->head;
|
|
kz_amqp_conn_ptr conn = kz_pool->head;
|
|
while(conn != NULL) {
|
|
while(conn != NULL) {
|
|
- kz_amqp_conn_ptr free = conn;
|
|
|
|
|
|
+ kz_amqp_conn_ptr tofree = conn;
|
|
conn = conn->next;
|
|
conn = conn->next;
|
|
- shm_free(free);
|
|
|
|
|
|
+ kz_amqp_free_connection(tofree);
|
|
}
|
|
}
|
|
shm_free(kz_pool);
|
|
shm_free(kz_pool);
|
|
}
|
|
}
|
|
@@ -279,13 +290,45 @@ void kz_amqp_destroy() {
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+#define KZ_URL_MAX_SIZE 50
|
|
|
|
+static char* KZ_URL_ROOT = "/";
|
|
|
|
+
|
|
int kz_amqp_add_connection(modparam_t type, void* val)
|
|
int kz_amqp_add_connection(modparam_t type, void* val)
|
|
{
|
|
{
|
|
kz_amqp_init_connection_pool(); // find a better way
|
|
kz_amqp_init_connection_pool(); // find a better way
|
|
|
|
|
|
|
|
+ char* url = (char*) val;
|
|
|
|
+ int len = strlen(url);
|
|
|
|
+ if(len > KZ_URL_MAX_SIZE) {
|
|
|
|
+ LM_ERR("connection url exceeds max size %d\n", KZ_URL_MAX_SIZE);
|
|
|
|
+ return -1;
|
|
|
|
+ }
|
|
|
|
+
|
|
kz_amqp_conn_ptr newConn = shm_malloc(sizeof(kz_amqp_conn));
|
|
kz_amqp_conn_ptr newConn = shm_malloc(sizeof(kz_amqp_conn));
|
|
memset(newConn, 0, sizeof(kz_amqp_conn));
|
|
memset(newConn, 0, sizeof(kz_amqp_conn));
|
|
|
|
|
|
|
|
+ newConn->url = shm_malloc( (KZ_URL_MAX_SIZE + 1) * sizeof(char) );
|
|
|
|
+ memset(newConn->url, 0, (KZ_URL_MAX_SIZE + 1) * sizeof(char));
|
|
|
|
+ // maintain compatibility
|
|
|
|
+ if (!strncmp((char*)val, "kazoo://", 8)) {
|
|
|
|
+ sprintf(newConn->url, "amqp://%s", (char*)(url+(8*sizeof(char))) );
|
|
|
|
+ } else {
|
|
|
|
+ strcpy(newConn->url, url);
|
|
|
|
+ newConn->url[len] = '\0';
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if(amqp_parse_url(newConn->url, &newConn->info) == AMQP_STATUS_BAD_URL) {
|
|
|
|
+ LM_ERR("ERROR PARSING URL \"%s\"\n", newConn->url);
|
|
|
|
+ goto error;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ if(newConn->info.vhost == NULL) {
|
|
|
|
+ newConn->info.vhost = KZ_URL_ROOT;
|
|
|
|
+ } else if(newConn->info.vhost[0] == '/' && strlen(newConn->info.vhost) == 1) { // bug in amqp_parse_url ?
|
|
|
|
+ newConn->info.vhost++;
|
|
|
|
+ }
|
|
|
|
+
|
|
if(kz_pool->head == NULL)
|
|
if(kz_pool->head == NULL)
|
|
kz_pool->head = newConn;
|
|
kz_pool->head = newConn;
|
|
|
|
|
|
@@ -294,9 +337,12 @@ int kz_amqp_add_connection(modparam_t type, void* val)
|
|
|
|
|
|
kz_pool->tail = newConn;
|
|
kz_pool->tail = newConn;
|
|
|
|
|
|
- amqp_parse_url((char*)val, &newConn->info);
|
|
|
|
-
|
|
|
|
return 0;
|
|
return 0;
|
|
|
|
+
|
|
|
|
+error:
|
|
|
|
+ kz_amqp_free_connection(newConn);
|
|
|
|
+ return -1;
|
|
|
|
+
|
|
}
|
|
}
|
|
|
|
|
|
void kz_amqp_connection_close(kz_amqp_conn_ptr rmq) {
|
|
void kz_amqp_connection_close(kz_amqp_conn_ptr rmq) {
|
|
@@ -313,9 +359,6 @@ void kz_amqp_connection_close(kz_amqp_conn_ptr rmq) {
|
|
rmq->conn = NULL;
|
|
rmq->conn = NULL;
|
|
rmq->socket = NULL;
|
|
rmq->socket = NULL;
|
|
rmq->channel_count = 0;
|
|
rmq->channel_count = 0;
|
|
-
|
|
|
|
-// lock_release(&kz_pool->lock);
|
|
|
|
-
|
|
|
|
}
|
|
}
|
|
|
|
|
|
}
|
|
}
|
|
@@ -348,7 +391,7 @@ int kz_amqp_connection_open(kz_amqp_conn_ptr rmq) {
|
|
}
|
|
}
|
|
|
|
|
|
if (kz_amqp_error("Logging in", amqp_login(rmq->conn,
|
|
if (kz_amqp_error("Logging in", amqp_login(rmq->conn,
|
|
- "/", //rmq->info.vhost,
|
|
|
|
|
|
+ rmq->info.vhost,
|
|
0,
|
|
0,
|
|
131072,
|
|
131072,
|
|
0,
|
|
0,
|