Browse Source

kazoo : fix null pointer usage

also some code cleanup
Luis Azedo 10 years ago
parent
commit
0b54fb31be
2 changed files with 25 additions and 171 deletions
  1. 1 1
      modules/kazoo/kazoo.c
  2. 24 170
      modules/kazoo/kz_amqp.c

+ 1 - 1
modules/kazoo/kazoo.c

@@ -60,7 +60,7 @@ int dbk_reconn_retries = 8;
 int dbk_presentity_phtable_size = 4096;
 int dbk_command_table_size = 2048;
 
-int dbk_use_federated_exchange = 1;
+int dbk_use_federated_exchange = 0;
 str dbk_federated_exchange = str_init("federation");
 str dbk_primary_zone_name = str_init("local");
 

+ 24 - 170
modules/kazoo/kz_amqp.c

@@ -23,8 +23,6 @@
 
 struct tm_binds tmb;
 
-//kz_amqp_connection_pool_ptr kz_pool = NULL;
-
 kz_amqp_bindings_ptr kz_bindings = NULL;
 int bindings_count = 0;
 
@@ -34,7 +32,6 @@ typedef struct json_object *json_obj_ptr;
 
 extern pv_spec_t kz_query_result_spec;
 
-//kz_amqp_channel_ptr channels = NULL;
 extern int *kz_worker_pipes;
 extern int kz_cmd_pipe;
 
@@ -156,23 +153,6 @@ static inline str* kz_local_str_dup(str* src)
 	return dst;
 }
 
-/*
-static char *kz_local_amqp_string_dup(char *src)
-{
-	char *res;
-	int sz;
-	if (!src )
-		return NULL;
-
-	sz = strlen(src);
-	if (!(res = (char *) pkg_malloc(sz + 1)))
-		return NULL;
-	strncpy(res, src, sz);
-	res[sz] = 0;
-	return res;
-}
-*/
-
 char *kz_amqp_bytes_dup(amqp_bytes_t bytes)
 {
 	char *res;
@@ -789,68 +769,6 @@ int kz_amqp_channel_open(kz_amqp_conn_ptr rmq, amqp_channel_t channel) {
     return 0;
 }
 
-kz_amqp_conn_ptr kz_amqp_get_connection() {
-	return NULL;
-
-	/*
-	kz_amqp_conn_ptr ptr = NULL;
-	if(kz_pool == NULL) {
-		return NULL;
-	}
-//	lock_get(&kz_pool->lock);
-
-	ptr = kz_pool->head;
-
-	if(kz_pool->current != NULL) {
-		ptr = kz_pool->current;
-	}
-
-	if(ptr->socket == NULL )
-	{
-	while(ptr != NULL) {
-		if(kz_amqp_connection_open(ptr) == 0) {
-			kz_pool->current = ptr;
-			break;
-		}
-		ptr = ptr->next;
-	}
-	}
-
-//	lock_release(&kz_pool->lock);
-
-   	return ptr;
-   	*/
-}
-
-kz_amqp_conn_ptr kz_amqp_get_next_connection() {
-	return NULL;
-	/*
-	kz_amqp_conn_ptr ptr = NULL;
-	if(kz_pool == NULL) {
-		return NULL;
-	}
-
-	if(kz_pool->current != NULL) {
-		ptr = kz_pool->current->next;
-	}
-
-	if(ptr == NULL) {
-		ptr = kz_pool->head;
-	}
-
-	while(ptr != NULL) {
-		if(kz_amqp_connection_open(ptr) == 0) {
-			kz_pool->current = ptr;
-			break;
-		}
-		ptr = ptr->next;
-	}
-
-
-   	return ptr;
-   	*/
-}
-
 int kz_amqp_consume_error(kz_amqp_conn_ptr ptr)
 {
 	amqp_connection_state_t conn = ptr->conn;
@@ -1008,7 +926,6 @@ int kz_amqp_pipe_send(str *str_exchange, str *str_routing_key, str *str_payload)
 	if(lock_init(&cmd->lock)==NULL)
 	{
 		LM_ERR("cannot init the lock for publishing in process %d\n", getpid());
-//		lock_dealloc(&cmd->lock);
 		goto error;
 	}
 	lock_get(&cmd->lock);
@@ -1175,7 +1092,6 @@ int kz_amqp_async_query(struct sip_msg* msg, char* _exchange, char* _routing_key
       int ret = -1;
       json_obj_ptr json_obj = NULL;
 	  kz_amqp_cmd_ptr cmd = NULL;
-//	  json_obj_ptr json_body = NULL;
 	  unsigned int hash_index = 0;
 	  unsigned int label = 0;
 	  tm_cell_t *t = 0;
@@ -1281,6 +1197,7 @@ int kz_amqp_async_query(struct sip_msg* msg, char* _exchange, char* _routing_key
 			lock_dealloc(&cmd->lock);
 			goto error;
 		}
+		lock_get(&cmd->lock);
 		cmd->type = KZ_AMQP_CMD_ASYNC_CALL;
 		cmd->consumer = getpid();
 		if (write(kz_cmd_pipe, &cmd, sizeof(cmd)) != sizeof(cmd)) {
@@ -2146,18 +2063,6 @@ error:
 	return 0;
 }
 
-/*
-void kz_amqp_stop_cmd_timer(kz_amqp_cmd_ptr cmd)
-{
-	if (cmd->timer_ev) {
-			close(cmd->timerfd);
-			event_del(cmd->timer_ev);
-			pkg_free(cmd->timer_ev);
-		} else {
-			LM_ERR("No timer for message id %.*s\n", cmd->message_id->len, cmd->message_id->s);
-		}
-}
-*/
 
 /* check timeouts */
 int kz_amqp_timeout_proc()
@@ -2284,7 +2189,6 @@ void kz_amqp_reconnect_cb(int fd, short event, void *arg)
 
 int kz_amqp_handle_server_failure(kz_amqp_conn_ptr connection)
 {
-//	LM_INFO("Setting timer to reconnect to %s on port %d in %d seconds.\n", server->host, server->port, JSONRPC_RECONNECT_INTERVAL);
 
 	connection->state = KZ_AMQP_CONNECTION_FAILURE;
 	int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
@@ -2295,7 +2199,6 @@ int kz_amqp_handle_server_failure(kz_amqp_conn_ptr connection)
 	}
 
 	struct itimerspec *itime = pkg_malloc(sizeof(struct itimerspec));
-//	CHECK_MALLOC(itime);
 	itime->it_interval.tv_sec = 0;
 	itime->it_interval.tv_nsec = 0;
 
@@ -2422,7 +2325,6 @@ void kz_amqp_publisher_proc_cb(int fd, short event, void *arg)
 			LM_DBG("amqp message id %.*s not found.\n", cmd->message_id->len, cmd->message_id->s);
 			kz_amqp_free_pipe_cmd(cmd);
 		} else {
-//			kz_amqp_stop_cmd_timer(retrieved_cmd);
 			retrieved_cmd->return_code = cmd->return_code;
 			retrieved_cmd->return_payload = cmd->return_payload;
 			cmd->return_payload = NULL;
@@ -2572,13 +2474,6 @@ void kz_amqp_send_worker_event(int _kz_server_id, amqp_envelope_t* envelope, kz_
     kz_amqp_consumer_delivery_ptr ptr = NULL;
     str* message_id = NULL;
     int idx = envelope->channel-1;
-    /*
-	char* payload = kz_amqp_bytes_dup(envelope->message.body);
-	if(!payload) {
-		LM_ERR("error alocating memory\n");
-		return;
-	}
-	*/
 
 	json_obj_ptr json_obj = kz_json_parse((char*)envelope->message.body.bytes);
     if (json_obj == NULL) {
@@ -2605,13 +2500,6 @@ void kz_amqp_send_worker_event(int _kz_server_id, amqp_envelope_t* envelope, kz_
 			cmd = kz_cmd_retrieve(message_id);
 			if(cmd)
 				cmd->return_code = AMQP_RESPONSE_NORMAL;
-
-			/*
-			if(cmd != NULL) {
-				cmd->return_code = 0;
-				cmd->return_payload = kz_amqp_string_dup((char*)json_object_to_json_string(json_obj));
-			}
-			*/
 		}
     }
 
@@ -2669,34 +2557,26 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr)
 	if(server_ptr->zone == kz_amqp_get_primary_zone())
 		channel_base = dbk_channels;
 
-//    server_ptr->consumer = (kz_amqp_conn_ptr)pkg_malloc(sizeof(kz_amqp_conn));
-//    if(server_ptr->consumer == NULL)
 	consumer = (kz_amqp_conn_ptr)pkg_malloc(sizeof(kz_amqp_conn));
     if(consumer == NULL)
     {
     	LM_ERR("NO MORE PACKAGE MEMORY\n");
     	return 1;
     }
-//    memset(server_ptr->consumer, 0, sizeof(kz_amqp_conn));
-//    server_ptr->consumer->server = server_ptr;
     memset(consumer, 0, sizeof(kz_amqp_conn));
     consumer->server = server_ptr;
 
     consumer_channels = (kz_amqp_channel_ptr)pkg_malloc(sizeof(kz_amqp_channel)*bindings_count);
-//    server_ptr->consumer_channels = (kz_amqp_channel_ptr)pkg_malloc(sizeof(kz_amqp_channel)*bindings_count);
-//    if(server_ptr->consumer_channels == NULL)
     if(consumer_channels == NULL)
     {
     	LM_ERR("NO MORE PACKAGE MEMORY\n");
     	return 1;
     }
 	for(i=0; i < bindings_count; i++)
-//		server_ptr->consumer_channels[i].channel = dbk_channels + i + 1;
 		consumer_channels[i].channel = channel_base + i + 1;
 
     while(1) {
     	OK = 1;
-//   		if(kz_amqp_connection_open(server_ptr->consumer)) {
    		if(kz_amqp_connection_open(consumer)) {
    			LM_ERR("Error opening connection\n");
    			sleep(3);
@@ -2706,25 +2586,17 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr)
 
     	/* reset channels */
 
+		/* bind targeted channels */
     	for(i=0,channel_res=0; i < channel_base && channel_res == 0; i++) {
-			/* start cleanup */
-//    		server_ptr->channels[i].consumer = NULL;
-			/* end cleanup */
-
-			/* bind targeted channels */
-//			channel_res = kz_amqp_channel_open(server_ptr->consumer, server_ptr->channels[i].channel);
 			channel_res = kz_amqp_channel_open(consumer, server_ptr->channels[i].channel);
 			if(channel_res == 0) {
-//				kz_amqp_bind_targeted_channel(server_ptr->consumer, i);
 				kz_amqp_bind_targeted_channel(consumer, i);
 			}
     	}
 
+		/*  cleanup consumer channels */
     	for(i=0,channel_res=0; i < bindings_count && channel_res == 0; i++) {
-			/* start cleanup */
-//    		server_ptr->consumer_channels[i].consumer = NULL;
     		consumer_channels[i].consumer = NULL;
-			/* end cleanup */
     	}
 
     	i = 0;
@@ -2733,12 +2605,9 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr)
 			kz_amqp_binding_ptr binding = kz_bindings->head;
 			while(binding != NULL && OK) {
 				if(binding->bind->federate || server_ptr->zone == kz_amqp_get_primary_zone()) {
-//					channel_res = kz_amqp_channel_open(server_ptr->consumer, server_ptr->consumer_channels[i].channel);
 					channel_res = kz_amqp_channel_open(consumer, consumer_channels[i].channel);
 					if(channel_res == 0) {
-//						kz_amqp_bind_consumer(server_ptr->consumer, binding->bind, i, server_ptr->consumer_channels);
 						kz_amqp_bind_consumer(consumer, binding->bind, i, consumer_channels);
-//						server_ptr->consumer_channels[i].state = KZ_AMQP_CHANNEL_BINDED;
 						consumer_channels[i].state = KZ_AMQP_CHANNEL_BINDED;
 						i++;
 					} else {
@@ -2755,9 +2624,7 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr)
 		while(OK) {
 			payload = NULL;
 			amqp_envelope_t envelope;
-//			amqp_maybe_release_buffers(server_ptr->consumer->conn);
 			amqp_maybe_release_buffers(consumer->conn);
-//			amqp_rpc_reply_t reply = amqp_consume_message(server_ptr->consumer->conn, &envelope, NULL, 0);
 			amqp_rpc_reply_t reply = amqp_consume_message(consumer->conn, &envelope, NULL, 0);
 			switch(reply.reply_type) {
 			case AMQP_RESPONSE_LIBRARY_EXCEPTION:
@@ -2770,7 +2637,6 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr)
 					break;
 				case AMQP_STATUS_UNEXPECTED_STATE:
 					LM_DBG("AMQP_STATUS_UNEXPECTED_STATE\n");
-//					OK = kz_amqp_consume_error(server_ptr->consumer);
 					OK = kz_amqp_consume_error(consumer);
 					break;
 				default:
@@ -2785,36 +2651,15 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr)
 					kz_amqp_send_worker_event(server_ptr->id, &envelope, NULL);
 				} else {
 					idx = idx - channel_base;
-//					if(!server_ptr->consumer_channels[idx].consumer->no_ack ) {
 					if(!consumer_channels[idx].consumer->no_ack ) {
-//						if(amqp_basic_ack(server_ptr->consumer->conn, envelope.channel, envelope.delivery_tag, 0 ) < 0) {
 						if(amqp_basic_ack(consumer->conn, envelope.channel, envelope.delivery_tag, 0 ) < 0) {
 							LM_ERR("AMQP ERROR TRYING TO ACK A MSG\n");
 							OK = 0;
 						}
 					}
 					if(OK)
-//						kz_amqp_send_worker_event(server_ptr->id, &envelope, server_ptr->consumer_channels[idx].consumer);
 						kz_amqp_send_worker_event(server_ptr->id, &envelope, consumer_channels[idx].consumer);
 				}
-				/*
-				idx = envelope.channel-1;
-				if(idx < dbk_channels) {
-					kz_send_targeted_cmd(server_ptr->id, envelope.message.body);
-				} else {
-					idx = idx - dbk_channels;
-					kz_amqp_send_consumer_event_ex(maybe_add_consumer_key(server_ptr->id, envelope.message.body),
-						kz_amqp_bytes_dup(server_ptr->consumer_channels[idx].consumer->event_key),
-						kz_amqp_bytes_dup(server_ptr->consumer_channels[idx].consumer->event_subkey),
-						0, 0, 1);
-					if(!server_ptr->consumer_channels[idx].consumer->no_ack ) {
-						if(amqp_basic_ack(server_ptr->consumer->conn, envelope.channel, envelope.delivery_tag, 0 ) < 0) {
-							LM_ERR("AMQP ERROR TRYING TO ACK A MSG\n");
-							OK = 0;
-						}
-					}
-				}
-				*/
 				break;
 			case AMQP_RESPONSE_SERVER_EXCEPTION:
 				LM_ERR("AMQP_RESPONSE_SERVER_EXCEPTION in consume\n");
@@ -2829,7 +2674,6 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr)
 			amqp_destroy_envelope(&envelope);
 		}
 
-//    	kz_amqp_connection_close(server_ptr->consumer);
     	kz_amqp_connection_close(consumer);
 
     }
@@ -2838,27 +2682,37 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr)
 
 void kz_amqp_consumer_worker_cb(int fd, short event, void *arg)
 {
-	kz_amqp_consumer_delivery_ptr cmd;
-	if (read(fd, &cmd, sizeof(cmd)) != sizeof(cmd)) {
+	kz_amqp_cmd_ptr cmd = NULL;
+	kz_amqp_consumer_delivery_ptr Evt;
+	if (read(fd, &Evt, sizeof(Evt)) != sizeof(Evt)) {
 		LM_ERR("failed to read from command pipe: %s\n", strerror(errno));
 		return;
 	}
 
-	LM_DBG("consumer %d received payload %s\n", my_pid(), cmd->payload);
+	LM_DBG("consumer %d received payload %s\n", my_pid(), Evt->payload);
 
-	if(cmd->cmd) {
-		if(cmd->cmd->return_code == AMQP_RESPONSE_NORMAL) {
-			kz_amqp_set_last_result(cmd->payload);
-			kz_amqp_cb_ok(cmd->cmd);
+	if(Evt->cmd) {
+		cmd =Evt->cmd;
+		if(cmd->type == KZ_AMQP_CMD_ASYNC_CALL ) {
+			if(cmd->return_code == AMQP_RESPONSE_NORMAL) {
+				kz_amqp_set_last_result(Evt->payload);
+				kz_amqp_cb_ok(cmd);
+			} else {
+				kz_amqp_reset_last_result();
+				kz_amqp_cb_error(cmd);
+				LM_DBG("run error exiting consumer %d\n", my_pid());
+			}
 		} else {
-			kz_amqp_reset_last_result();
-			kz_amqp_cb_error(cmd->cmd);
+			cmd->return_payload = Evt->payload;
+			Evt->payload = NULL;
+			Evt->cmd = NULL;
+			lock_release(&cmd->lock);
 		}
 	} else {
-		kz_amqp_consumer_event(cmd->payload, cmd->event_key, cmd->event_subkey);
+		kz_amqp_consumer_event(Evt->payload, Evt->event_key, Evt->event_subkey);
 	}
 
-	kz_amqp_free_consumer_delivery(cmd);
+	kz_amqp_free_consumer_delivery(Evt);
 	LM_DBG("exiting consumer %d\n", my_pid());
 }