|
@@ -263,7 +263,7 @@ int set_non_blocking(int fd)
|
|
|
/**
|
|
|
*
|
|
|
*/
|
|
|
-int nsq_consumer_worker_proc(int cmd_pipe, char *topic, char *channel)
|
|
|
+int nsq_consumer_worker_proc(int cmd_pipe, char *topic, char *channel, int max_in_flight)
|
|
|
{
|
|
|
struct ev_loop *loop;
|
|
|
loop = ev_default_loop(0);
|
|
@@ -279,6 +279,7 @@ int nsq_consumer_worker_proc(int cmd_pipe, char *topic, char *channel)
|
|
|
LM_DBG("NSQ Worker connecting to NSQ Topic [%s] and NSQ Channel [%s]\n", topic, channel);
|
|
|
// setup the reader
|
|
|
rdr = new_nsq_reader(loop, topic, channel, (void *)ctx, NULL, NULL, NULL, nsq_message_handler);
|
|
|
+ rdr->max_in_flight = max_in_flight;
|
|
|
|
|
|
if (consumer_use_nsqd == 0) {
|
|
|
snprintf(address, 128, "%.*s", nsq_lookupd_address.len, nsq_lookupd_address.s);
|
|
@@ -300,6 +301,11 @@ static int mod_child_init(int rank)
|
|
|
int pid;
|
|
|
int i;
|
|
|
int workers = dbn_consumer_workers / nsq_topic_channel_counter;
|
|
|
+ int max_in_flight = 1;
|
|
|
+
|
|
|
+ if (nsq_max_in_flight > 1) {
|
|
|
+ max_in_flight = nsq_max_in_flight;
|
|
|
+ }
|
|
|
|
|
|
fire_init_event(rank);
|
|
|
|
|
@@ -318,7 +324,7 @@ static int mod_child_init(int rank)
|
|
|
return -1; /* error */
|
|
|
if (pid==0){
|
|
|
close(nsq_worker_pipes_fds[i*2+1]);
|
|
|
- return(nsq_consumer_worker_proc(nsq_worker_pipes_fds[i*2], DEFAULT_TOPIC, DEFAULT_CHANNEL));
|
|
|
+ return(nsq_consumer_worker_proc(nsq_worker_pipes_fds[i*2], DEFAULT_TOPIC, DEFAULT_CHANNEL, max_in_flight));
|
|
|
}
|
|
|
}
|
|
|
} else {
|
|
@@ -329,7 +335,7 @@ static int mod_child_init(int rank)
|
|
|
return -1; /* error */
|
|
|
if (pid==0){
|
|
|
close(nsq_worker_pipes_fds[i*2+1]);
|
|
|
- return(nsq_consumer_worker_proc(nsq_worker_pipes_fds[i*2], tc->topic, tc->channel));
|
|
|
+ return(nsq_consumer_worker_proc(nsq_worker_pipes_fds[i*2], tc->topic, tc->channel, max_in_flight));
|
|
|
}
|
|
|
}
|
|
|
tc = tc->next;
|