| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220 | /** * Copyright (C) 2014 Daniel-Constantin Mierla (asipto.com) * * This file is part of Extensible SIP Router, a free SIP server. * * Permission to use, copy, modify, and distribute this software for any * purpose with or without fee is hereby granted, provided that the above * copyright notice and this permission notice appear in all copies. * * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */#include <stdio.h>#include <unistd.h>#include <stdlib.h>#include <string.h>#include <sys/socket.h>#include <sys/types.h>#include <sys/un.h>#include <netinet/in.h>#include <arpa/inet.h>#include <fcntl.h>#include <errno.h>#include "dprint.h"#include "sr_module.h"#include "ut.h"#include "pt.h"#include "cfg/cfg_struct.h"#include "async_task.h"static int _async_task_workers = 0;static int _async_task_sockets[2];int async_task_run(int idx);/** * */int async_task_init_sockets(void){	if (socketpair(PF_UNIX, SOCK_DGRAM, 0, _async_task_sockets) < 0) {		LM_ERR("opening tasks dgram socket pair\n");		return -1;	}	LM_DBG("inter-process event notification sockets initialized\n");	return 0;}/** * */void async_task_close_sockets_child(void){	LM_DBG("closing the notification socket used by children\n");	close(_async_task_sockets[1]);}/** * */void async_task_close_sockets_parent(void){	LM_DBG("closing the notification socket used by parent\n");	close(_async_task_sockets[0]);}/** * */int async_task_init(void){	LM_DBG("start initializing asynk task framework\n");	if(_async_task_workers<=0)		return 0;	/* advertise new processes to core */	register_procs(_async_task_workers);	/* advertise new processes to cfg framework */	cfg_register_child(_async_task_workers);	return 0;}/** * */int async_task_initialized(void){	if(_async_task_workers<=0)		return 0;	return 1;}/** * */int async_task_child_init(int rank){	int pid;	int i;	if(_async_task_workers<=0)		return 0;	LM_DBG("child initializing asynk task framework\n");	if (rank==PROC_INIT) {		if(async_task_init_sockets()<0) {			LM_ERR("failed to initialize tasks sockets\n");			return -1;		}		return 0;	}	if(rank>0) {		async_task_close_sockets_parent();		return 0;	}	if (rank!=PROC_MAIN)		return 0;	for(i=0; i<_async_task_workers; i++) {		pid=fork_process(PROC_RPC, "Async Task Worker", 1);		if (pid<0)			return -1; /* error */		if(pid==0) {			/* child */			/* initialize the config framework */			if (cfg_child_init())				return -1;			/* main function for workers */			if(async_task_run(i+1)<0) {				LM_ERR("failed to initialize task worker process: %d\n", i);				return -1;			}		}	}	return 0;}/** * */int async_task_set_workers(int n){	if(_async_task_workers>0) {		LM_WARN("task workers already set\n");		return 0;	}	if(n<=0)		return 0;	_async_task_workers = n;	return 0;}/** * */int async_task_push(async_task_t *task){	int len;	if(_async_task_workers<=0)		return 0;	len = write(_async_task_sockets[1], &task, sizeof(async_task_t*));	if(len<=0) {		LM_ERR("failed to pass the task to asynk workers\n");		return -1;	}	LM_DBG("task sent [%p]\n", task);	return 0;}/** * */int async_task_run(int idx){	async_task_t *ptask;	int received;	LM_DBG("async task worker %d ready\n", idx);	for( ; ; ) {		if ((received = recvfrom(_async_task_sockets[0],							&ptask, sizeof(async_task_t*),							0, NULL, 0)) < 0) {			LM_ERR("failed to received task (%d: %s)\n", errno, strerror(errno));			continue;		}		if(received != sizeof(async_task_t*)) {			LM_ERR("invalid task size %d\n", received);			continue;		}		if(ptask->exec!=NULL) {			LM_DBG("task executed [%p] (%p/%p)\n", ptask,					ptask->exec, ptask->param);			ptask->exec(ptask->param);		}		shm_free(ptask);	}	return 0;}
 |