|
@@ -113,6 +113,20 @@ amqp_bytes_t kz_amqp_bytes_dup_from_str(str *src)
|
|
|
return kz_amqp_bytes_malloc_dup(amqp_cstring_bytes(src->s));
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+void kz_amqp_free_consumer_delivery(kz_amqp_consumer_delivery_ptr ptr)
|
|
|
+{
|
|
|
+ if(ptr == NULL)
|
|
|
+ return;
|
|
|
+ if(ptr->payload)
|
|
|
+ shm_free(ptr->payload);
|
|
|
+ if(ptr->event_key)
|
|
|
+ shm_free(ptr->event_key);
|
|
|
+ if(ptr->event_subkey)
|
|
|
+ shm_free(ptr->event_subkey);
|
|
|
+ shm_free(ptr);
|
|
|
+}
|
|
|
+
|
|
|
void kz_amqp_free_bind(kz_amqp_bind_ptr bind)
|
|
|
{
|
|
|
if(bind == NULL)
|
|
@@ -125,6 +139,10 @@ void kz_amqp_free_bind(kz_amqp_bind_ptr bind)
|
|
|
kz_amqp_bytes_free(bind->queue);
|
|
|
if(bind->routing_key.bytes)
|
|
|
kz_amqp_bytes_free(bind->routing_key);
|
|
|
+ if(bind->event_key.bytes)
|
|
|
+ kz_amqp_bytes_free(bind->event_key);
|
|
|
+ if(bind->event_subkey.bytes)
|
|
|
+ kz_amqp_bytes_free(bind->event_subkey);
|
|
|
shm_free(bind);
|
|
|
}
|
|
|
|
|
@@ -180,7 +198,7 @@ kz_amqp_cmd_ptr kz_amqp_alloc_pipe_cmd()
|
|
|
return cmd;
|
|
|
}
|
|
|
|
|
|
-kz_amqp_bind_ptr kz_amqp_bind_alloc(str* exchange, str* exchange_type, str* queue, str* routing_key )
|
|
|
+kz_amqp_bind_ptr kz_amqp_bind_alloc_ex(str* exchange, str* exchange_type, str* queue, str* routing_key, str* event_key, str* event_subkey )
|
|
|
{
|
|
|
kz_amqp_bind_ptr bind = NULL;
|
|
|
|
|
@@ -223,6 +241,22 @@ kz_amqp_bind_ptr kz_amqp_bind_alloc(str* exchange, str* exchange_type, str* queu
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ if(event_key != NULL) {
|
|
|
+ bind->event_key = kz_amqp_bytes_dup_from_str(event_key);
|
|
|
+ if (bind->event_key.bytes == NULL) {
|
|
|
+ LM_ERR("Out of memory allocating for routing key\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if(event_subkey != NULL) {
|
|
|
+ bind->event_subkey = kz_amqp_bytes_dup_from_str(event_subkey);
|
|
|
+ if (bind->event_subkey.bytes == NULL) {
|
|
|
+ LM_ERR("Out of memory allocating for routing key\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
return bind;
|
|
|
|
|
|
error:
|
|
@@ -230,6 +264,11 @@ error:
|
|
|
return NULL;
|
|
|
}
|
|
|
|
|
|
+kz_amqp_bind_ptr kz_amqp_bind_alloc(str* exchange, str* exchange_type, str* queue, str* routing_key )
|
|
|
+{
|
|
|
+ return kz_amqp_bind_alloc_ex(exchange, exchange_type, queue, routing_key, NULL, NULL );
|
|
|
+}
|
|
|
+
|
|
|
void kz_amqp_init_connection_pool() {
|
|
|
if(kz_pool == NULL) {
|
|
|
kz_pool = (kz_amqp_conn_pool_ptr) shm_malloc(sizeof(kz_amqp_conn_pool));
|
|
@@ -923,6 +962,8 @@ int kz_amqp_subscribe(struct sip_msg* msg, char* payload)
|
|
|
str queue_s;
|
|
|
str routing_key_s;
|
|
|
str payload_s;
|
|
|
+ str key_s;
|
|
|
+ str subkey_s;
|
|
|
int passive = 0;
|
|
|
int durable = 0;
|
|
|
int exclusive = 0;
|
|
@@ -950,6 +991,8 @@ int kz_amqp_subscribe(struct sip_msg* msg, char* payload)
|
|
|
json_extract_field("type", exchange_type_s);
|
|
|
json_extract_field("queue", queue_s);
|
|
|
json_extract_field("routing", routing_key_s);
|
|
|
+ json_extract_field("event_key", key_s);
|
|
|
+ json_extract_field("event_subkey", subkey_s);
|
|
|
|
|
|
tmpObj = json_object_object_get(json_obj, "passive");
|
|
|
if(tmpObj != NULL) {
|
|
@@ -1397,7 +1440,7 @@ int kz_amqp_consumer_fire_event(char *eventkey)
|
|
|
|
|
|
}
|
|
|
|
|
|
-void kz_amqp_consumer_event(int child_no, char *payload)
|
|
|
+void kz_amqp_consumer_event(int child_no, char *payload, char* event_key, char* event_subkey)
|
|
|
{
|
|
|
json_obj_ptr json_obj = NULL;
|
|
|
str ev_name = {0, 0}, ev_category = {0, 0};
|
|
@@ -1414,20 +1457,33 @@ void kz_amqp_consumer_event(int child_no, char *payload)
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- json_extract_field(dbk_consumer_event_key.s, ev_category);
|
|
|
- json_extract_field(dbk_consumer_event_subkey.s, ev_name);
|
|
|
+ char* key = (event_key == NULL ? dbk_consumer_event_key.s : event_key);
|
|
|
+ char* subkey = (event_subkey == NULL ? dbk_consumer_event_subkey.s : event_subkey);
|
|
|
+
|
|
|
+ json_extract_field(key, ev_category);
|
|
|
+ json_extract_field(subkey, ev_name);
|
|
|
|
|
|
sprintf(buffer, "kazoo:consumer-event-%.*s-%.*s",ev_category.len, ev_category.s, ev_name.len, ev_name.s);
|
|
|
for (p=buffer ; *p; ++p) *p = tolower(*p);
|
|
|
for (p=buffer ; *p; ++p) if(*p == '_') *p = '-';
|
|
|
if(kz_amqp_consumer_fire_event(buffer) != 0) {
|
|
|
- sprintf(buffer, "kazoo:consumer-event-%.*s-%.*s",dbk_consumer_event_key.len, dbk_consumer_event_key.s, dbk_consumer_event_subkey.len, dbk_consumer_event_subkey.s);
|
|
|
+ sprintf(buffer, "kazoo:consumer-event-%.*s",ev_category.len, ev_category.s);
|
|
|
for (p=buffer ; *p; ++p) *p = tolower(*p);
|
|
|
for (p=buffer ; *p; ++p) if(*p == '_') *p = '-';
|
|
|
if(kz_amqp_consumer_fire_event(buffer) != 0) {
|
|
|
- sprintf(buffer, "kazoo:consumer-event");
|
|
|
+ sprintf(buffer, "kazoo:consumer-event-%s-%s", key, subkey);
|
|
|
+ for (p=buffer ; *p; ++p) *p = tolower(*p);
|
|
|
+ for (p=buffer ; *p; ++p) if(*p == '_') *p = '-';
|
|
|
if(kz_amqp_consumer_fire_event(buffer) != 0) {
|
|
|
- LM_ERR("kazoo:consumer-event not found");
|
|
|
+ sprintf(buffer, "kazoo:consumer-event-%s", key);
|
|
|
+ for (p=buffer ; *p; ++p) *p = tolower(*p);
|
|
|
+ for (p=buffer ; *p; ++p) if(*p == '_') *p = '-';
|
|
|
+ if(kz_amqp_consumer_fire_event(buffer) != 0) {
|
|
|
+ sprintf(buffer, "kazoo:consumer-event");
|
|
|
+ if(kz_amqp_consumer_fire_event(buffer) != 0) {
|
|
|
+ LM_ERR("kazoo:consumer-event not found");
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1462,7 +1518,7 @@ void kz_amqp_consumer_loop(int child_no)
|
|
|
kz_amqp_consumer_delivery_ptr ptr;
|
|
|
if(read(data_pipe, &ptr, sizeof(ptr)) == sizeof(ptr)) {
|
|
|
LM_DBG("consumer %d received payload %s\n", child_no, ptr->payload);
|
|
|
- kz_amqp_consumer_event(child_no, ptr->payload);
|
|
|
+ kz_amqp_consumer_event(child_no, ptr->payload, ptr->event_key, ptr->event_subkey);
|
|
|
if(ptr->channel > 0 && ptr->delivery_tag > 0) {
|
|
|
kz_amqp_cmd_ptr cmd = kz_amqp_alloc_pipe_cmd();
|
|
|
cmd->type = KZ_AMQP_ACK;
|
|
@@ -1472,8 +1528,7 @@ void kz_amqp_consumer_loop(int child_no)
|
|
|
LM_ERR("failed to send ack to AMQP Manager in process %d, write to command pipe: %s\n", getpid(), strerror(errno));
|
|
|
}
|
|
|
}
|
|
|
- shm_free(ptr->payload);
|
|
|
- shm_free(ptr);
|
|
|
+ kz_amqp_free_consumer_delivery(ptr);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1494,8 +1549,9 @@ int check_timeout(struct timeval *now, struct timeval *start, struct timeval *ti
|
|
|
|
|
|
int consumer = 1;
|
|
|
|
|
|
-void kz_amqp_send_consumer_event_ex(char* payload, amqp_channel_t channel, uint64_t delivery_tag, int nextConsumer)
|
|
|
+void kz_amqp_send_consumer_event_ex(char* payload, char* event_key, char* event_subkey, amqp_channel_t channel, uint64_t delivery_tag, int nextConsumer)
|
|
|
{
|
|
|
+ int len = 0;
|
|
|
kz_amqp_consumer_delivery_ptr ptr = (kz_amqp_consumer_delivery_ptr) shm_malloc(sizeof(kz_amqp_consumer_delivery));
|
|
|
if(ptr == NULL) {
|
|
|
LM_ERR("NO MORE SHARED MEMORY!");
|
|
@@ -1505,7 +1561,8 @@ void kz_amqp_send_consumer_event_ex(char* payload, amqp_channel_t channel, uint6
|
|
|
ptr->channel = channel;
|
|
|
ptr->delivery_tag = delivery_tag;
|
|
|
ptr->payload = payload;
|
|
|
-
|
|
|
+ ptr->event_key = event_key;
|
|
|
+ ptr->event_subkey = event_subkey;
|
|
|
if (write(kz_pipe_fds[consumer*2+1], &ptr, sizeof(ptr)) != sizeof(ptr)) {
|
|
|
LM_ERR("failed to send payload to consumer %d : %s\nPayload %s\n", consumer, strerror(errno), payload);
|
|
|
}
|
|
@@ -1520,7 +1577,7 @@ void kz_amqp_send_consumer_event_ex(char* payload, amqp_channel_t channel, uint6
|
|
|
|
|
|
void kz_amqp_send_consumer_event(char* payload, int nextConsumer)
|
|
|
{
|
|
|
- kz_amqp_send_consumer_event_ex(payload, 0, 0, nextConsumer);
|
|
|
+ kz_amqp_send_consumer_event_ex(payload, NULL, NULL, 0, 0, nextConsumer);
|
|
|
}
|
|
|
|
|
|
void kz_amqp_fire_connection_event(char *event, char* host)
|
|
@@ -1732,6 +1789,8 @@ void kz_amqp_manager_loop(int child_no)
|
|
|
break;
|
|
|
case KZ_AMQP_CONSUMING:
|
|
|
kz_amqp_send_consumer_event_ex(kz_amqp_bytes_dup(envelope.message.body),
|
|
|
+ kz_amqp_bytes_dup(channels[idx].consumer->event_key),
|
|
|
+ kz_amqp_bytes_dup(channels[idx].consumer->event_subkey),
|
|
|
channels[idx].consumer->no_ack ? 0 : envelope.channel,
|
|
|
channels[idx].consumer->no_ack ? 0 : envelope.delivery_tag,
|
|
|
(firstLoop && dbk_single_consumer_on_reconnect) ? 0 : 1);
|