|
@@ -1551,6 +1551,7 @@ int kz_amqp_subscribe(struct sip_msg* msg, char* payload)
|
|
int no_ack = 1;
|
|
int no_ack = 1;
|
|
int federate = 0;
|
|
int federate = 0;
|
|
int wait_for_consumer_ack = 1;
|
|
int wait_for_consumer_ack = 1;
|
|
|
|
+ int consistent_worker = 0;
|
|
|
|
|
|
json_obj_ptr json_obj = NULL;
|
|
json_obj_ptr json_obj = NULL;
|
|
struct json_object* tmpObj = NULL;
|
|
struct json_object* tmpObj = NULL;
|
|
@@ -1607,6 +1608,11 @@ int kz_amqp_subscribe(struct sip_msg* msg, char* payload)
|
|
federate = json_object_get_int(tmpObj);
|
|
federate = json_object_get_int(tmpObj);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ tmpObj = kz_json_get_object(json_obj, "consistent-worker");
|
|
|
|
+ if(tmpObj != NULL) {
|
|
|
|
+ consistent_worker = json_object_get_int(tmpObj);
|
|
|
|
+ }
|
|
|
|
+
|
|
kz_amqp_bind_ptr bind = kz_amqp_bind_alloc(&exchange_s, &exchange_type_s, &queue_s, &routing_key_s);
|
|
kz_amqp_bind_ptr bind = kz_amqp_bind_alloc(&exchange_s, &exchange_type_s, &queue_s, &routing_key_s);
|
|
if(bind == NULL) {
|
|
if(bind == NULL) {
|
|
LM_ERR("Could not allocate bind struct\n");
|
|
LM_ERR("Could not allocate bind struct\n");
|
|
@@ -1620,6 +1626,7 @@ int kz_amqp_subscribe(struct sip_msg* msg, char* payload)
|
|
bind->no_ack = no_ack;
|
|
bind->no_ack = no_ack;
|
|
bind->wait_for_consumer_ack = wait_for_consumer_ack;
|
|
bind->wait_for_consumer_ack = wait_for_consumer_ack;
|
|
bind->federate = federate;
|
|
bind->federate = federate;
|
|
|
|
+ bind->consistent_worker = consistent_worker;
|
|
|
|
|
|
|
|
|
|
kz_amqp_binding_ptr binding = shm_malloc(sizeof(kz_amqp_binding));
|
|
kz_amqp_binding_ptr binding = shm_malloc(sizeof(kz_amqp_binding));
|
|
@@ -2571,6 +2578,7 @@ void kz_amqp_send_worker_event(int _kz_server_id, amqp_envelope_t* envelope, kz_
|
|
kz_amqp_consumer_delivery_ptr ptr = NULL;
|
|
kz_amqp_consumer_delivery_ptr ptr = NULL;
|
|
str* message_id = NULL;
|
|
str* message_id = NULL;
|
|
int idx = envelope->channel-1;
|
|
int idx = envelope->channel-1;
|
|
|
|
+ int worker = 0;
|
|
|
|
|
|
json_obj_ptr json_obj = kz_json_parse((char*)envelope->message.body.bytes);
|
|
json_obj_ptr json_obj = kz_json_parse((char*)envelope->message.body.bytes);
|
|
if (json_obj == NULL) {
|
|
if (json_obj == NULL) {
|
|
@@ -2617,12 +2625,21 @@ void kz_amqp_send_worker_event(int _kz_server_id, amqp_envelope_t* envelope, kz_
|
|
ptr->event_subkey = kz_amqp_bytes_dup(bind->event_subkey);
|
|
ptr->event_subkey = kz_amqp_bytes_dup(bind->event_subkey);
|
|
}
|
|
}
|
|
|
|
|
|
- consumer++;
|
|
|
|
- if(consumer >= dbk_consumer_workers) {
|
|
|
|
- consumer = 0;
|
|
|
|
- }
|
|
|
|
|
|
+ if(bind && bind->consistent_worker) {
|
|
|
|
+ str rk;
|
|
|
|
+ rk.s = (char*)envelope->routing_key.bytes;
|
|
|
|
+ rk.len = (int)envelope->routing_key.len;
|
|
|
|
+ worker = core_hash(&rk, NULL, dbk_consumer_workers);
|
|
|
|
+ LM_DBG("computed worker for %.*s is %d\n", rk.len, rk.s, worker);
|
|
|
|
+ } else {
|
|
|
|
+ consumer++;
|
|
|
|
+ if(consumer >= dbk_consumer_workers) {
|
|
|
|
+ consumer = 0;
|
|
|
|
+ }
|
|
|
|
+ worker = consumer;
|
|
|
|
+ }
|
|
|
|
|
|
- if (write(kz_worker_pipes[consumer], &ptr, sizeof(ptr)) != sizeof(ptr)) {
|
|
|
|
|
|
+ if (write(kz_worker_pipes[worker], &ptr, sizeof(ptr)) != sizeof(ptr)) {
|
|
LM_ERR("failed to send payload to consumer %d : %s\nPayload %s\n", consumer, strerror(errno), ptr->payload);
|
|
LM_ERR("failed to send payload to consumer %d : %s\nPayload %s\n", consumer, strerror(errno), ptr->payload);
|
|
goto error;
|
|
goto error;
|
|
}
|
|
}
|