|
@@ -159,7 +159,6 @@ static int fire_init_event(int rank)
|
|
|
|
|
|
static int mod_init(void)
|
|
|
{
|
|
|
- int i;
|
|
|
startup_time = (int) time(NULL);
|
|
|
|
|
|
if (dbn_pua_mode == 1) {
|
|
@@ -208,26 +207,6 @@ static int mod_init(void)
|
|
|
register_procs(total_workers);
|
|
|
cfg_register_child(total_workers);
|
|
|
|
|
|
- if (pipe(nsq_cmd_pipe_fds) < 0) {
|
|
|
- LM_ERR("cmd pipe() failed\n");
|
|
|
- return -1;
|
|
|
- }
|
|
|
-
|
|
|
- nsq_worker_pipes_fds = (int*) shm_malloc(sizeof(int) * (dbn_consumer_workers) * 2 );
|
|
|
- nsq_worker_pipes = (int*) shm_malloc(sizeof(int) * dbn_consumer_workers);
|
|
|
- for (i=0; i < dbn_consumer_workers; i++) {
|
|
|
- nsq_worker_pipes_fds[i*2] = nsq_worker_pipes_fds[i*2+1] = -1;
|
|
|
- if (pipe(&nsq_worker_pipes_fds[i*2]) < 0) {
|
|
|
- LM_ERR("worker pipe(%d) failed\n", i);
|
|
|
- return -1;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- nsq_cmd_pipe = nsq_cmd_pipe_fds[1];
|
|
|
- for (i=0; i < dbn_consumer_workers; i++) {
|
|
|
- nsq_worker_pipes[i] = nsq_worker_pipes_fds[i*2+1];
|
|
|
- }
|
|
|
-
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
@@ -240,25 +219,10 @@ int mod_register(char *path, int *dlflags, void *p1, void *p2)
|
|
|
return register_trans_mod(path, mod_trans);
|
|
|
}
|
|
|
|
|
|
-
|
|
|
-int set_non_blocking(int fd)
|
|
|
-{
|
|
|
- int flags;
|
|
|
-
|
|
|
- flags = fcntl(fd, F_GETFL);
|
|
|
- if (flags < 0)
|
|
|
- return flags;
|
|
|
- flags |= O_NONBLOCK;
|
|
|
- if (fcntl(fd, F_SETFL, flags) < 0)
|
|
|
- return -1;
|
|
|
-
|
|
|
- return 0;
|
|
|
-}
|
|
|
-
|
|
|
/**
|
|
|
*
|
|
|
*/
|
|
|
-int nsq_consumer_worker_proc(int cmd_pipe, char *topic, char *channel, int max_in_flight)
|
|
|
+void nsq_consumer_worker_proc(char *topic, char *channel, int max_in_flight)
|
|
|
{
|
|
|
struct ev_loop *loop;
|
|
|
loop = ev_default_loop(0);
|
|
@@ -269,7 +233,6 @@ int nsq_consumer_worker_proc(int cmd_pipe, char *topic, char *channel, int max_i
|
|
|
if (loop == NULL) {
|
|
|
LM_ERR("cannot get libev loop\n");
|
|
|
}
|
|
|
- set_non_blocking(cmd_pipe);
|
|
|
|
|
|
LM_DBG("NSQ Worker connecting to NSQ Topic [%s] and NSQ Channel [%s]\n", topic, channel);
|
|
|
// setup the reader
|
|
@@ -285,7 +248,6 @@ int nsq_consumer_worker_proc(int cmd_pipe, char *topic, char *channel, int max_i
|
|
|
}
|
|
|
|
|
|
nsq_run(loop);
|
|
|
- return 0;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -318,8 +280,10 @@ static int mod_child_init(int rank)
|
|
|
if (pid<0)
|
|
|
return -1; /* error */
|
|
|
if (pid==0){
|
|
|
- close(nsq_worker_pipes_fds[i*2+1]);
|
|
|
- return(nsq_consumer_worker_proc(nsq_worker_pipes_fds[i*2], DEFAULT_TOPIC, DEFAULT_CHANNEL, max_in_flight));
|
|
|
+ if (cfg_child_init()) return -1;
|
|
|
+ nsq_consumer_worker_proc(DEFAULT_TOPIC, DEFAULT_CHANNEL, max_in_flight);
|
|
|
+ LM_CRIT("nsq_consumer_worker_proc():: worker_process finished without exit!\n");
|
|
|
+ exit(-1);
|
|
|
}
|
|
|
}
|
|
|
} else {
|
|
@@ -329,8 +293,10 @@ static int mod_child_init(int rank)
|
|
|
if (pid<0)
|
|
|
return -1; /* error */
|
|
|
if (pid==0){
|
|
|
- close(nsq_worker_pipes_fds[i*2+1]);
|
|
|
- return(nsq_consumer_worker_proc(nsq_worker_pipes_fds[i*2], tc->topic, tc->channel, max_in_flight));
|
|
|
+ if (cfg_child_init()) return -1;
|
|
|
+ nsq_consumer_worker_proc(tc->topic, tc->channel, max_in_flight);
|
|
|
+ LM_CRIT("nsq_consumer_worker_proc():: worker_process finished without exit!\n");
|
|
|
+ exit(-1);
|
|
|
}
|
|
|
}
|
|
|
tc = tc->next;
|
|
@@ -367,6 +333,4 @@ static int mod_child_init(int rank)
|
|
|
*/
|
|
|
static void mod_destroy(void) {
|
|
|
free_tc_list(tc_list);
|
|
|
- shm_free(nsq_worker_pipes_fds);
|
|
|
- shm_free(nsq_worker_pipes);
|
|
|
}
|