浏览代码

dmq: Let the handler know about the sending node

Try to find a node based on the from uri of the incoming request and hand
it to the request handler.
Alex Hermann 11 年之前
父节点
当前提交
c2dcf4dbd8

+ 1 - 1
modules/dmq/notification_peer.c

@@ -187,7 +187,7 @@ int run_init_callbacks() {
 /**
  * @brief dmq notification callback
  */
-int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp)
+int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node)
 {
 	int nodes_recv;
 	str* response_body = NULL;

+ 1 - 1
modules/dmq/notification_peer.h

@@ -37,7 +37,7 @@ extern str notification_content_type;
 extern int *dmq_init_callback_done;
 
 int add_notification_peer();
-int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp);
+int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node);
 int extract_node_list(dmq_node_list_t* update_list, struct sip_msg* msg);
 str* build_notification_body();
 int build_node_str(dmq_node_t* node, char* buf, int buflen);

+ 1 - 1
modules/dmq/peer.c

@@ -97,7 +97,7 @@ dmq_peer_t* find_peer(str peer_id)
 /**
  * @empty callback
  */
-int empty_peer_callback(struct sip_msg* msg, peer_reponse_t* resp)
+int empty_peer_callback(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node)
 {
 	return 0;
 }

+ 3 - 2
modules/dmq/peer.h

@@ -28,6 +28,7 @@
 
 #include <string.h>
 #include <stdlib.h>
+#include "dmqnode.h"
 #include "../../lock_ops.h"
 #include "../../str.h"
 #include "../../mem/mem.h"
@@ -41,7 +42,7 @@ typedef struct peer_response {
 	str body;
 } peer_reponse_t;
 
-typedef int(*peer_callback_t)(struct sip_msg*, peer_reponse_t* resp);
+typedef int(*peer_callback_t)(struct sip_msg*, peer_reponse_t* resp, dmq_node_t* node);
 typedef int(*init_callback_t)();
 
 typedef struct dmq_peer {
@@ -66,7 +67,7 @@ typedef dmq_peer_t* (*register_dmq_peer_t)(dmq_peer_t*);
 
 dmq_peer_t* add_peer(dmq_peer_list_t* peer_list, dmq_peer_t* peer);
 dmq_peer_t* find_peer(str peer_id);
-int empty_peer_callback(struct sip_msg* msg, peer_reponse_t* resp);
+int empty_peer_callback(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node);
 
 #endif
 

+ 11 - 1
modules/dmq/worker.c

@@ -29,6 +29,8 @@
 #include "../../data_lump_rpl.h"
 #include "../../mod_fix.h"
 #include "../../sip_msg_clone.h"
+#include "../../parser/parse_from.h"
+#include "../../parser/parse_to.h"
 
 /**
  * @brief set the body of a response
@@ -74,6 +76,7 @@ void worker_loop(int id)
 	dmq_job_t* current_job;
 	peer_reponse_t peer_response;
 	int ret_value;
+	dmq_node_t *dmq_node = NULL;
 
 	worker = &workers[id];
 	for(;;) {
@@ -88,7 +91,14 @@ void worker_loop(int id)
 			current_job = job_queue_pop(worker->queue);
 			/* job_queue_pop might return NULL if queue is empty */
 			if(current_job) {
-				ret_value = current_job->f(current_job->msg, &peer_response);
+				/* extract the from uri */
+				if (parse_from_header(current_job->msg) < 0) {
+					LM_ERR("bad sip message or missing From hdr\n");
+				} else {
+					dmq_node = find_dmq_node_uri(node_list, &((struct to_body*)current_job->msg->from->parsed)->uri);
+				}
+
+				ret_value = current_job->f(current_job->msg, &peer_response, dmq_node);
 				if(ret_value < 0) {
 					LM_ERR("running job failed\n");
 					continue;

+ 1 - 1
modules/htable/ht_dmq.c

@@ -89,7 +89,7 @@ int ht_dmq_broadcast(str* body) {
 /**
  * @brief ht dmq callback
  */
-int ht_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp)
+int ht_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node)
 {
 	int content_length;
 	str body;

+ 1 - 1
modules/htable/ht_dmq.h

@@ -40,7 +40,7 @@ typedef enum {
 } ht_dmq_action_t;
 
 int ht_dmq_initialize();
-int ht_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp);
+int ht_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node);
 int ht_dmq_replicate_action(ht_dmq_action_t action, str* htname, str* cname, int type, int_str* val, int mode);
 int ht_dmq_replay_action(ht_dmq_action_t action, str* htname, str* cname, int type, int_str* val, int mode);
 int ht_dmq_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void* param);