|
@@ -2281,12 +2281,18 @@ int kz_amqp_send_receive(kz_amqp_server_ptr srv, kz_amqp_cmd_ptr cmd )
|
|
|
}
|
|
|
|
|
|
char* eventData = NULL;
|
|
|
+char* eventKey = NULL;
|
|
|
|
|
|
int kz_pv_get_event_payload(struct sip_msg *msg, pv_param_t *param, pv_value_t *res)
|
|
|
{
|
|
|
return eventData == NULL ? pv_get_null(msg, param, res) : pv_get_strzval(msg, param, res, eventData);
|
|
|
}
|
|
|
|
|
|
+int kz_pv_get_event_routing_key(struct sip_msg *msg, pv_param_t *param, pv_value_t *res)
|
|
|
+{
|
|
|
+ return eventKey == NULL ? pv_get_null(msg, param, res) : pv_get_strzval(msg, param, res, eventKey);
|
|
|
+}
|
|
|
+
|
|
|
int kz_amqp_consumer_fire_event(char *eventkey)
|
|
|
{
|
|
|
sip_msg_t *fmsg;
|
|
@@ -2314,32 +2320,35 @@ int kz_amqp_consumer_fire_event(char *eventkey)
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
-void kz_amqp_consumer_event(char *payload, char* event_key, char* event_subkey)
|
|
|
+void kz_amqp_consumer_event(kz_amqp_consumer_delivery_ptr Evt)
|
|
|
{
|
|
|
json_obj_ptr json_obj = NULL;
|
|
|
str ev_name = {0, 0}, ev_category = {0, 0};
|
|
|
char buffer[512];
|
|
|
char * p;
|
|
|
|
|
|
- eventData = payload;
|
|
|
+ eventData = Evt->payload;
|
|
|
+ if(Evt->routing_key) {
|
|
|
+ eventKey = Evt->routing_key->s;
|
|
|
+ }
|
|
|
|
|
|
- json_obj = kz_json_parse(payload);
|
|
|
+ json_obj = kz_json_parse(Evt->payload);
|
|
|
if (json_obj == NULL)
|
|
|
return;
|
|
|
|
|
|
- char* key = (event_key == NULL ? dbk_consumer_event_key.s : event_key);
|
|
|
- char* subkey = (event_subkey == NULL ? dbk_consumer_event_subkey.s : event_subkey);
|
|
|
+ char* key = (Evt->event_key == NULL ? dbk_consumer_event_key.s : Evt->event_key);
|
|
|
+ char* subkey = (Evt->event_subkey == NULL ? dbk_consumer_event_subkey.s : Evt->event_subkey);
|
|
|
|
|
|
json_extract_field(key, ev_category);
|
|
|
- if(ev_category.len == 0 && event_key) {
|
|
|
- ev_category.s = event_key;
|
|
|
- ev_category.len = strlen(event_key);
|
|
|
+ if(ev_category.len == 0 && Evt->event_key) {
|
|
|
+ ev_category.s = Evt->event_key;
|
|
|
+ ev_category.len = strlen(Evt->event_key);
|
|
|
}
|
|
|
|
|
|
json_extract_field(subkey, ev_name);
|
|
|
- if(ev_name.len == 0 && event_subkey) {
|
|
|
- ev_name.s = event_subkey;
|
|
|
- ev_name.len = strlen(event_subkey);
|
|
|
+ if(ev_name.len == 0 && Evt->event_subkey) {
|
|
|
+ ev_name.s = Evt->event_subkey;
|
|
|
+ ev_name.len = strlen(Evt->event_subkey);
|
|
|
}
|
|
|
|
|
|
sprintf(buffer, "kazoo:consumer-event-%.*s-%.*s",ev_category.len, ev_category.s, ev_name.len, ev_name.s);
|
|
@@ -2370,6 +2379,7 @@ void kz_amqp_consumer_event(char *payload, char* event_key, char* event_subkey)
|
|
|
json_object_put(json_obj);
|
|
|
|
|
|
eventData = NULL;
|
|
|
+ eventKey = NULL;
|
|
|
}
|
|
|
|
|
|
int check_timeout(struct timeval *now, struct timeval *start, struct timeval *timeout)
|
|
@@ -3190,7 +3200,8 @@ void kz_amqp_consumer_worker_cb(int fd, short event, void *arg)
|
|
|
lock_release(&cmd->lock);
|
|
|
}
|
|
|
} else {
|
|
|
- kz_amqp_consumer_event(Evt->payload, Evt->event_key, Evt->event_subkey);
|
|
|
+// kz_amqp_consumer_event(Evt->payload, Evt->event_key, Evt->event_subkey);
|
|
|
+ kz_amqp_consumer_event(Evt);
|
|
|
}
|
|
|
|
|
|
kz_amqp_free_consumer_delivery(Evt);
|