Emmanuel Schmidbauer 8 лет назад
Родитель
Сommit
6d0ec36a4a
1 измененных файлов с 9 добавлено и 3 удалено
  1. 9 3
      src/modules/nsq/nsq_mod.c

+ 9 - 3
src/modules/nsq/nsq_mod.c

@@ -263,7 +263,7 @@ int set_non_blocking(int fd)
 /**
  *
  */
-int nsq_consumer_worker_proc(int cmd_pipe, char *topic, char *channel)
+int nsq_consumer_worker_proc(int cmd_pipe, char *topic, char *channel, int max_in_flight)
 {
 	struct ev_loop *loop;
 	loop = ev_default_loop(0);
@@ -279,6 +279,7 @@ int nsq_consumer_worker_proc(int cmd_pipe, char *topic, char *channel)
 	LM_DBG("NSQ Worker connecting to NSQ Topic [%s] and NSQ Channel [%s]\n", topic, channel);
 	// setup the reader
 	rdr = new_nsq_reader(loop, topic, channel, (void *)ctx, NULL, NULL, NULL, nsq_message_handler);
+	rdr->max_in_flight = max_in_flight;
 
 	if (consumer_use_nsqd == 0) {
 		snprintf(address, 128, "%.*s", nsq_lookupd_address.len, nsq_lookupd_address.s);
@@ -300,6 +301,11 @@ static int mod_child_init(int rank)
 	int pid;
 	int i;
 	int workers = dbn_consumer_workers / nsq_topic_channel_counter;
+	int max_in_flight = 1;
+
+	if (nsq_max_in_flight > 1) {
+		max_in_flight = nsq_max_in_flight;
+	}
 
 	fire_init_event(rank);
 
@@ -318,7 +324,7 @@ static int mod_child_init(int rank)
 					return -1; /* error */
 				if (pid==0){
 					close(nsq_worker_pipes_fds[i*2+1]);
-					return(nsq_consumer_worker_proc(nsq_worker_pipes_fds[i*2], DEFAULT_TOPIC, DEFAULT_CHANNEL));
+					return(nsq_consumer_worker_proc(nsq_worker_pipes_fds[i*2], DEFAULT_TOPIC, DEFAULT_CHANNEL, max_in_flight));
 				}
 			}
 		} else {
@@ -329,7 +335,7 @@ static int mod_child_init(int rank)
 						return -1; /* error */
 					if (pid==0){
 						close(nsq_worker_pipes_fds[i*2+1]);
-						return(nsq_consumer_worker_proc(nsq_worker_pipes_fds[i*2], tc->topic, tc->channel));
+						return(nsq_consumer_worker_proc(nsq_worker_pipes_fds[i*2], tc->topic, tc->channel, max_in_flight));
 					}
 				}
 				tc = tc->next;