浏览代码

kazoo : more federation tweaks

when using federation, only use federation exchanges for zones != primary
lazedo 10 年之前
父节点
当前提交
324d64e9ed
共有 1 个文件被更改,包括 54 次插入54 次删除
  1. 54 54
      modules/kazoo/kz_amqp.c

+ 54 - 54
modules/kazoo/kz_amqp.c

@@ -1475,76 +1475,76 @@ int kz_amqp_bind_targeted_channel(kz_amqp_conn_ptr kz_conn, int idx )
 
 int kz_amqp_bind_consumer(kz_amqp_conn_ptr kz_conn, kz_amqp_bind_ptr bind, int idx, kz_amqp_channel_ptr chan)
 {
-    int ret = -1;
-    amqp_bytes_t federated_exchange = {0, 0};
-    amqp_bytes_t federated_routing_key = {0, 0};
+	int ret = -1;
+	amqp_bytes_t federated_exchange = {0, 0};
+	amqp_bytes_t federated_routing_key = {0, 0};
 	char _federated[100];
 
-	if(bind->federate == 0 || dbk_use_federated_exchange == 0) {
-		amqp_exchange_declare(kz_conn->conn, chan[idx].channel, bind->exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table);
-		if (kz_amqp_error("Declaring exchange", amqp_get_rpc_reply(kz_conn->conn)))
-		{
-			ret = -RET_AMQP_ERROR;
-			goto error;
-		}
-	}
+	if(bind->federate == 0
+		|| dbk_use_federated_exchange == 0
+		|| kz_conn->server->zone == kz_amqp_get_primary_zone()) {
+			amqp_exchange_declare(kz_conn->conn, chan[idx].channel, bind->exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table);
+			if (kz_amqp_error("Declaring exchange", amqp_get_rpc_reply(kz_conn->conn))) {
+				ret = -RET_AMQP_ERROR;
+				goto error;
+			}
+    }
 
-	if(bind->federate == 1 && dbk_use_federated_exchange == 1) {
-    	federated_exchange = kz_local_amqp_bytes_dup_from_string(dbk_federated_exchange.s);
-		amqp_exchange_declare(kz_conn->conn, chan[idx].channel, federated_exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table);
-		if (kz_amqp_error("Declaring federated exchange", amqp_get_rpc_reply(kz_conn->conn)))
-		{
-			ret = -RET_AMQP_ERROR;
-			goto error;
-		}
+	if(bind->federate == 1
+		&& dbk_use_federated_exchange == 1
+		&& kz_conn->server->zone != kz_amqp_get_primary_zone()) {
+			federated_exchange = kz_local_amqp_bytes_dup_from_string(dbk_federated_exchange.s);
+			amqp_exchange_declare(kz_conn->conn, chan[idx].channel, federated_exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table);
+			if (kz_amqp_error("Declaring federated exchange", amqp_get_rpc_reply(kz_conn->conn))) {
+				ret = -RET_AMQP_ERROR;
+				goto error;
+			}
     }
 
-    amqp_queue_declare(kz_conn->conn, chan[idx].channel, bind->queue, bind->passive, bind->durable, bind->exclusive, bind->auto_delete, kz_amqp_empty_table);
-    if (kz_amqp_error("Declaring queue", amqp_get_rpc_reply(kz_conn->conn)))
-    {
+	amqp_queue_declare(kz_conn->conn, chan[idx].channel, bind->queue, bind->passive, bind->durable, bind->exclusive, bind->auto_delete, kz_amqp_empty_table);
+	if (kz_amqp_error("Declaring queue", amqp_get_rpc_reply(kz_conn->conn))) {
 		ret = -RET_AMQP_ERROR;
 		goto error;
-    }
-
-	if(bind->federate == 0 || dbk_use_federated_exchange == 0) {
-		LM_DBG("QUEUE BIND\n");
-		if (amqp_queue_bind(kz_conn->conn, chan[idx].channel, bind->queue, bind->exchange, bind->routing_key, kz_amqp_empty_table) < 0
-			|| kz_amqp_error("Binding queue", amqp_get_rpc_reply(kz_conn->conn)))
-		{
-			ret = -RET_AMQP_ERROR;
-			goto error;
-		}
 	}
 
-    if(bind->federate == 1 && dbk_use_federated_exchange == 1) {
+    if(bind->federate == 0
+		|| dbk_use_federated_exchange == 0
+		|| kz_conn->server->zone == kz_amqp_get_primary_zone()) {
+			if (amqp_queue_bind(kz_conn->conn, chan[idx].channel, bind->queue, bind->exchange, bind->routing_key, kz_amqp_empty_table) < 0
+				|| kz_amqp_error("Binding queue", amqp_get_rpc_reply(kz_conn->conn))) {
+				ret = -RET_AMQP_ERROR;
+				goto error;
+			}
+    }
+
+	if(bind->federate == 1
+		&& dbk_use_federated_exchange == 1
+		&& kz_conn->server->zone != kz_amqp_get_primary_zone()) {
     	sprintf(_federated, "%.*s%s%.*s", (int)bind->exchange.len, (char*)bind->exchange.bytes,
-    			(bind->routing_key.len == 0 ? "" : "."),
+				(bind->routing_key.len == 0 ? "" : "."),
 				(int)bind->routing_key.len, (char*)bind->routing_key.bytes
-    			);
-    	federated_routing_key = kz_local_amqp_bytes_dup_from_string(_federated);
+				);
+		federated_routing_key = kz_local_amqp_bytes_dup_from_string(_federated);
 		if (amqp_queue_bind(kz_conn->conn, chan[idx].channel, bind->queue, federated_exchange, federated_routing_key, kz_amqp_empty_table) < 0
-			|| kz_amqp_error("Binding queue to federated exchange", amqp_get_rpc_reply(kz_conn->conn)))
-		{
-			ret = -RET_AMQP_ERROR;
-			goto error;
+			|| kz_amqp_error("Binding queue to federated exchange", amqp_get_rpc_reply(kz_conn->conn))) {
+				ret = -RET_AMQP_ERROR;
+				goto error;
 		}
-    }
+	}
 
-    LM_DBG("BASIC CONSUME\n");
-    if (amqp_basic_consume(kz_conn->conn, chan[idx].channel, bind->queue, kz_amqp_empty_bytes, 0, bind->no_ack, 0, kz_amqp_empty_table) < 0
-	    || kz_amqp_error("Consuming", amqp_get_rpc_reply(kz_conn->conn)))
-    {
-		ret = -RET_AMQP_ERROR;
-		goto error;
-    }
+	if (amqp_basic_consume(kz_conn->conn, chan[idx].channel, bind->queue, kz_amqp_empty_bytes, 0, bind->no_ack, 0, kz_amqp_empty_table) < 0
+		|| kz_amqp_error("Consuming", amqp_get_rpc_reply(kz_conn->conn))) {
+			ret = -RET_AMQP_ERROR;
+			goto error;
+	}
 
-    chan[idx].state = KZ_AMQP_CHANNEL_CONSUMING;
+	chan[idx].state = KZ_AMQP_CHANNEL_CONSUMING;
 	chan[idx].consumer = bind;
-    ret = idx;
- error:
- 	 kz_local_amqp_bytes_free(federated_exchange);
- 	 kz_local_amqp_bytes_free(federated_routing_key);
-     return ret;
+	ret = idx;
+error:
+	kz_local_amqp_bytes_free(federated_exchange);
+	kz_local_amqp_bytes_free(federated_routing_key);
+	return ret;
 }
 
 int kz_amqp_send_ex(kz_amqp_server_ptr srv, kz_amqp_cmd_ptr cmd, kz_amqp_channel_state state, int idx)