|
@@ -2034,7 +2034,32 @@ void kz_amqp_cb_error(kz_amqp_cmd_ptr cmd)
|
|
|
int n = route_get(&main_rt, cmd->err_route);
|
|
|
struct action *a = main_rt.rlist[n];
|
|
|
tmb.t_continue(cmd->t_hash, cmd->t_label, a);
|
|
|
- kz_amqp_free_pipe_cmd(cmd);
|
|
|
+}
|
|
|
+
|
|
|
+int kz_send_worker_error_event(kz_amqp_cmd_ptr cmd)
|
|
|
+{
|
|
|
+ cmd->return_code = -1;
|
|
|
+ kz_amqp_consumer_delivery_ptr ptr = (kz_amqp_consumer_delivery_ptr) shm_malloc(sizeof(kz_amqp_consumer_delivery));
|
|
|
+ if(ptr == NULL) {
|
|
|
+ LM_ERR("NO MORE SHARED MEMORY!");
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ memset(ptr, 0, sizeof(kz_amqp_consumer_delivery));
|
|
|
+ ptr->cmd = cmd;
|
|
|
+
|
|
|
+ consumer++;
|
|
|
+ if(consumer >= dbk_consumer_processes) {
|
|
|
+ consumer = 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (write(kz_worker_pipes[consumer], &ptr, sizeof(ptr)) != sizeof(ptr)) {
|
|
|
+ LM_ERR("failed to send payload to consumer %d : %s\nPayload %s\n", consumer, strerror(errno), cmd->payload);
|
|
|
+ kz_amqp_free_consumer_delivery(ptr);
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ return 1;
|
|
|
+
|
|
|
}
|
|
|
|
|
|
void kz_amqp_cmd_timeout_cb(int fd, short event, void *arg)
|
|
@@ -2047,7 +2072,7 @@ void kz_amqp_cmd_timeout_cb(int fd, short event, void *arg)
|
|
|
, retrieved_cmd ->message_id->len, retrieved_cmd ->message_id->s
|
|
|
);
|
|
|
if(retrieved_cmd->type == KZ_AMQP_CMD_ASYNC_CALL) {
|
|
|
- kz_amqp_cb_error(retrieved_cmd);
|
|
|
+ kz_send_worker_error_event(retrieved_cmd);
|
|
|
} else {
|
|
|
retrieved_cmd->return_code = -1;
|
|
|
lock_release(&retrieved_cmd->lock);
|
|
@@ -2577,6 +2602,9 @@ void kz_amqp_send_worker_event(int _kz_server_id, amqp_envelope_t* envelope, kz_
|
|
|
}
|
|
|
if(idx < dbk_channels) {
|
|
|
cmd = kz_cmd_retrieve(message_id);
|
|
|
+ if(cmd)
|
|
|
+ cmd->return_code = AMQP_RESPONSE_NORMAL;
|
|
|
+
|
|
|
/*
|
|
|
if(cmd != NULL) {
|
|
|
cmd->return_code = 0;
|
|
@@ -2791,8 +2819,13 @@ void kz_amqp_consumer_worker_cb(int fd, short event, void *arg)
|
|
|
LM_DBG("consumer %d received payload %s\n", my_pid(), cmd->payload);
|
|
|
|
|
|
if(cmd->cmd) {
|
|
|
- kz_amqp_set_last_result(cmd->payload);
|
|
|
- kz_amqp_cb_ok(cmd->cmd);
|
|
|
+ if(cmd->cmd->return_code == AMQP_RESPONSE_NORMAL) {
|
|
|
+ kz_amqp_set_last_result(cmd->payload);
|
|
|
+ kz_amqp_cb_ok(cmd->cmd);
|
|
|
+ } else {
|
|
|
+ kz_amqp_reset_last_result();
|
|
|
+ kz_amqp_cb_error(cmd->cmd);
|
|
|
+ }
|
|
|
} else {
|
|
|
kz_amqp_consumer_event(cmd->payload, cmd->event_key, cmd->event_subkey);
|
|
|
}
|