2
0
Эх сурвалжийг харах

kazoo: max size of routing key

lazedo 8 жил өмнө
parent
commit
56e07cf2bb

+ 5 - 1
src/modules/kazoo/kazoo.c

@@ -43,6 +43,8 @@
 
 #define AMQP_WORKERS_RANKING PROC_XWORKER
 
+#define MAX_ROUTING_KEY_SIZE 255
+
 static int mod_init(void);
 static int  mod_child_init(int rank);
 static int fire_init_event(int rank);
@@ -58,7 +60,6 @@ int dbk_auth_wait_timeout = 3;
 int dbk_reconn_retries = 8;
 int dbk_presentity_phtable_size = 4096;
 int dbk_command_table_size = 2048;
-
 int dbk_use_federated_exchange = 0;
 str dbk_federated_exchange = str_init("federation");
 str dbk_primary_zone_name = str_init("local");
@@ -93,6 +94,8 @@ int dbk_use_hearbeats = 0;
 int dbk_single_consumer_on_reconnect = 1;
 int dbk_consume_messages_on_reconnect = 1;
 
+int kz_max_routing_key_size = MAX_ROUTING_KEY_SIZE;
+
 int startup_time = 0;
 
 int *kz_worker_pipes_fds = NULL;
@@ -199,6 +202,7 @@ static param_export_t params[] = {
     {"amqps_verify_peer", INT_PARAM, &kz_amqps_verify_peer},
     {"amqps_verify_hostname", INT_PARAM, &kz_amqps_verify_hostname},
 	{"pua_lock_type", INT_PARAM, &kz_pua_lock_type},
+    {"amqp_routing_key_max_size", INT_PARAM, &kz_max_routing_key_size},
     {0, 0, 0}
 };
 

+ 51 - 21
src/modules/kazoo/kz_amqp.c

@@ -80,6 +80,8 @@ extern int kz_amqps_verify_hostname;
 
 extern pv_spec_t kz_query_timeout_spec;
 
+extern int kz_max_routing_key_size;
+
 const amqp_bytes_t kz_amqp_empty_bytes = { 0, NULL };
 const amqp_table_t kz_amqp_empty_table = { 0, NULL };
 
@@ -1233,6 +1235,11 @@ int kz_amqp_publish(struct sip_msg* msg, char* exchange, char* routing_key, char
 			return -1;
 		}
 
+		if (routing_key_s.len > kz_max_routing_key_size) {
+			LM_ERR("routing_key size (%d) > max %d\n", routing_key_s.len, kz_max_routing_key_size);
+			return -1;
+		}
+
 		struct json_object *j = json_tokener_parse(json_s.s);
 
 		if (is_error(j)) {
@@ -1291,6 +1298,11 @@ int kz_amqp_async_query(struct sip_msg* msg, char* _exchange, char* _routing_key
 		  goto error;
 	  }
 
+	  if (routing_key_s.len > kz_max_routing_key_size) {
+		  LM_ERR("routing_key size (%d) > max %d\n", routing_key_s.len, kz_max_routing_key_size);
+		  return -1;
+	  }
+
 	  json_obj = json_tokener_parse(json_s.s);
 
 	  if (is_error(json_obj)) {
@@ -1439,6 +1451,11 @@ int kz_amqp_query_ex(struct sip_msg* msg, char* exchange, char* routing_key, cha
 			return -1;
 		}
 
+		if (routing_key_s.len > kz_max_routing_key_size) {
+			LM_ERR("routing_key size (%d) > max %d\n", routing_key_s.len, kz_max_routing_key_size);
+			return -1;
+		}
+
 		struct json_object *j = json_tokener_parse(json_s.s);
 
 		if (is_error(j)) {
@@ -1992,35 +2009,39 @@ error:
 
 #define hexint(C) (C < 10?('0' + C):('A'+ C - 10))
 
-char *kz_amqp_util_encode(const str * key, char *dest) {
+void kz_amqp_util_encode(const str * key, char *pdest) {
+    char *p, *end;
+	char *dest = pdest;
     if ((key->len == 1) && (key->s[0] == '#' || key->s[0] == '*')) {
-	*dest++ = key->s[0];
-	return dest;
+    	*dest++ = key->s[0];
+    	return;
     }
-    char *p, *end;
-    for (p = key->s, end = key->s + key->len; p < end; p++) {
-	if (KEY_SAFE(*p)) {
-	    *dest++ = *p;
-	} else if (*p == '.') {
-	    memcpy(dest, "\%2E", 3);
-	    dest += 3;
-	} else if (*p == ' ') {
-	    *dest++ = '+';
-	} else {
-	    *dest++ = '%';
-	    sprintf(dest, "%c%c", hexint(HI4(*p)), hexint(LO4(*p)));
-	    dest += 2;
-	}
+    for (p = key->s, end = key->s + key->len; p < end && ((dest - pdest) < (kz_max_routing_key_size - 1)); p++) {
+		if (KEY_SAFE(*p)) {
+			*dest++ = *p;
+		} else if (*p == '.') {
+			memcpy(dest, "\%2E", 3);
+			dest += 3;
+		} else if (*p == ' ') {
+			*dest++ = '+';
+		} else {
+			*dest++ = '%';
+			sprintf(dest, "%c%c", hexint(HI4(*p)), hexint(LO4(*p)));
+			dest += 2;
+		}
     }
     *dest = '\0';
-    return dest;
 }
 
 int kz_amqp_encode_ex(str* unencoded, pv_value_p dst_val)
 {
-	char routing_key_buff[256];
-	memset(routing_key_buff,0, sizeof(routing_key_buff));
-	kz_amqp_util_encode(unencoded, routing_key_buff);
+	char *routing_key_buff = (char*)pkg_malloc(kz_max_routing_key_size);
+	if(routing_key_buff == NULL) {
+		LM_ERR("no more private memory allocating for amqp_encode\n");
+	} else {
+		memset(routing_key_buff,0, sizeof(routing_key_buff));
+		kz_amqp_util_encode(unencoded, routing_key_buff);
+	}
 
 	int len = strlen(routing_key_buff);
 	dst_val->rs.s = pkg_malloc(len+1);
@@ -2029,6 +2050,10 @@ int kz_amqp_encode_ex(str* unencoded, pv_value_p dst_val)
 	dst_val->rs.len = len;
 	dst_val->flags = PV_VAL_STR | PV_VAL_PKG;
 
+	if(routing_key_buff) {
+		pkg_free(routing_key_buff);
+	}
+
 	return 1;
 
 }
@@ -2045,6 +2070,11 @@ int kz_amqp_encode(struct sip_msg* msg, char* unencoded, char* encoded)
 		return -1;
 	}
 
+	if (unencoded_s.len > kz_max_routing_key_size) {
+		LM_ERR("routing_key size (%d) > max %d\n", unencoded_s.len, kz_max_routing_key_size);
+		return -1;
+	}
+
 	kz_amqp_encode_ex(&unencoded_s, &dst_val);
 	dst_pv->setf(msg, &dst_pv->pvp, (int)EQ_T, &dst_val);