cppsp_standalone.C 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. /*
  2. This program is free software: you can redistribute it and/or modify
  3. it under the terms of the GNU General Public License as published by
  4. the Free Software Foundation, either version 3 of the License, or
  5. (at your option) any later version.
  6. This program is distributed in the hope that it will be useful,
  7. but WITHOUT ANY WARRANTY; without even the implied warranty of
  8. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  9. GNU General Public License for more details.
  10. You should have received a copy of the GNU General Public License
  11. along with this program. If not, see <http://www.gnu.org/licenses/>.
  12. * */
  13. #include <cpoll/cpoll.H>
  14. #include <unistd.h>
  15. #include <iostream>
  16. #include <signal.h>
  17. #include <cppsp/page.H>
  18. #include <cppsp/cppsp_cpoll.H>
  19. #include <cppsp/common.H>
  20. #include <assert.h>
  21. #include <sys/syscall.h> //SYS_gettid
  22. #include "server.C"
  23. #define PRINTSIZE(x) printf("sizeof("#x") = %i\n",sizeof(x))
  24. #define printerr(x, ...) fprintf(stderr, "\x1B[41;1;33m" x "\x1B[0;0;0m\n", ##__VA_ARGS__)
  25. #define printinfo(x, ...) fprintf(stderr, "\x1B[1;1;1m" x "\x1B[0;0;0m\n", ##__VA_ARGS__)
  26. #ifndef SO_REUSEPORT
  27. #define SO_REUSEPORT 15
  28. #endif
  29. #define CPPSP_LISTEN_BACKLOG 256
  30. using namespace std;
  31. using namespace CP;
  32. using namespace cppsp;
  33. using namespace RGC;
  34. string rootDir;
  35. void parseArgs(int argc, char** argv, const function<void(char*, const function<char*()>&)>& cb);
  36. struct workerThread
  37. {
  38. cppspServer::Server srv;
  39. vector<const char*> modules;
  40. Ref<CP::Socket> listenSock;
  41. union {
  42. pid_t pid;
  43. pthread_t thread;
  44. };
  45. int threadid;
  46. int cpu; //id of cpu to pin to, or -1
  47. workerThread(Socket& sock): srv(rootDir.c_str()),
  48. listenSock(sock),cpu(-1){}
  49. };
  50. class handler1: public RGC::Allocator
  51. {
  52. public:
  53. Socket sock;
  54. cppspServer::handler h;
  55. handler1(cppspServer::Server& thr,CP::Poll& poll,HANDLE s,int d,int t,int p):
  56. sock(s,d,t,p),h(thr,poll,sock) {
  57. h.allocator=this;
  58. }
  59. void* alloc(int s) { return NULL; }
  60. void dealloc(void* ptr) {
  61. sock.~Socket();
  62. if(allocator==NULL)free(this);
  63. else allocator->dealloc(this);
  64. }
  65. };
  66. void pinToCPU(int cpu) {
  67. cpu_set_t s;
  68. CPU_ZERO(&s);
  69. CPU_SET(cpu,&s);
  70. if(sched_setaffinity((pid_t)syscall(SYS_gettid),sizeof(s),&s)!=0)
  71. perror("sched_setaffinity");
  72. }
  73. void* thread1(void* v) {
  74. workerThread& thr=*(workerThread*)v;
  75. cppspServer::Server& srv=thr.srv;
  76. Poll p;
  77. if(thr.cpu>=0) pinToCPU(thr.cpu);
  78. /*
  79. p.add(thr->efd);
  80. struct {
  81. Poll& p;
  82. serverThread* thr;
  83. void operator()(eventfd_t eventcount) {
  84. cppsp_request* req;
  85. while((req=thr->req_queue.beginDequeue())!=NULL) {
  86. Socket* sock=new Socket(req->fd,listensock.addressFamily,
  87. listensock.type,listensock.protocol);
  88. //printf("new socket: %p\n",sock);
  89. p.add(*sock);
  90. processRequest(*thr,p,*sock);
  91. sock->release();
  92. thr->req_queue.endDequeue();
  93. }
  94. }
  95. } cb {p, thr};
  96. thr->efd.repeatGetEvent(&cb);*/
  97. MemoryPool handlerPool(sizeof(handler1),256);
  98. struct {
  99. Poll& p;
  100. workerThread& thr;
  101. MemoryPool& handlerPool;
  102. int reqn;
  103. void operator()(HANDLE sock) {
  104. //printf("thread %i: accepted socket: %p (%i)\n",thr->threadid,sock,sock->handle);
  105. handler1* hdlr=new (handlerPool.alloc())
  106. handler1(thr.srv,p,sock,thr.listenSock->addressFamily,
  107. thr.listenSock->type,thr.listenSock->protocol);
  108. hdlr->allocator=&handlerPool;
  109. if(++reqn>10) {
  110. reqn=0;
  111. sched_yield();
  112. }
  113. }
  114. } cb {p, thr, handlerPool, 0};
  115. p.add(*thr.listenSock);
  116. Timer t((uint64_t)2000);
  117. struct {
  118. cppspServer::Server& srv;
  119. void operator()(int count) {
  120. srv.updateTime();
  121. }
  122. } cb1 {srv};
  123. t.setCallback(&cb1);
  124. p.add(t);
  125. int modsLeft;
  126. struct {
  127. int& modsLeft;
  128. workerThread& thr;
  129. Delegate<void(HANDLE)> cb;
  130. void operator()() {
  131. if(--modsLeft == 0) {
  132. thr.listenSock->repeatAcceptHandle(cb);
  133. }
  134. }
  135. } afterModuleLoad {modsLeft,thr,&cb};
  136. struct {
  137. const char* s;
  138. Delegate<void()> afterModuleLoad;
  139. void operator()(void*,exception* ex) {
  140. if(ex!=NULL) {
  141. fprintf(stderr,"error loading module %s: %s\n",s,ex->what());
  142. cppsp::CompileException* ce = dynamic_cast<cppsp::CompileException*>(ex);
  143. if (ce != NULL) {
  144. printf("%s\n",ce->compilerOutput.c_str());
  145. }
  146. }
  147. afterModuleLoad();
  148. }
  149. } moduleCB[thr.modules.size()];
  150. modsLeft=thr.modules.size();
  151. for(int ii=0;ii<(int)thr.modules.size();ii++) {
  152. moduleCB[ii].s=thr.modules[ii];
  153. moduleCB[ii].afterModuleLoad=&afterModuleLoad;
  154. thr.srv.loadModule(p,thr.modules[ii],&moduleCB[ii]);
  155. }
  156. if(thr.modules.size()==0) thr.listenSock->repeatAcceptHandle(&cb);
  157. p.loop();
  158. return NULL;
  159. }
  160. CP::Socket listensock;
  161. int main(int argc, char** argv) {
  162. {
  163. char cwd[255];
  164. if(getcwd(cwd,255)==NULL) throw runtime_error(strerror(errno));
  165. rootDir=cwd;
  166. }
  167. string listen="0.0.0.0:80";
  168. int threads=-1;
  169. bool f0rk=false;
  170. vector<string> cxxopts;
  171. vector<const char*> modules;
  172. bool reusePort=true;
  173. bool setAffinity=false;
  174. try {
  175. parseArgs(argc, argv,
  176. [&](char* name, const std::function<char*()>& getvalue)
  177. {
  178. if(name==NULL) goto help;
  179. if(strcmp(name,"r")==0) {
  180. rootDir=getvalue();
  181. } else if(strcmp(name,"c")==0) {
  182. cxxopts.push_back(getvalue());
  183. } else if(strcmp(name,"g")==0) {
  184. cppsp::gxx=getvalue();
  185. } else if(strcmp(name,"l")==0) {
  186. listen=getvalue();
  187. } else if(strcmp(name,"t")==0) {
  188. threads=atoi(getvalue());
  189. } else if(strcmp(name,"m")==0) {
  190. modules.push_back(getvalue());
  191. } else if(strcmp(name,"f")==0) {
  192. f0rk=true;
  193. } else if(strcmp(name,"s")==0) {
  194. reusePort=false;
  195. } else if(strcmp(name,"a")==0) {
  196. setAffinity=true;
  197. } else {
  198. help:
  199. fprintf(stderr,"usage: %s [options]...\noptions:\n"
  200. "\t-l <host:port>: listen on specified host:port (default: 0.0.0.0:80)\n"
  201. "\t-g <option>: specify the C++ compiler (default: g++)\n"
  202. "\t-c <option>: specify a compiler option to be passed to g++\n"
  203. "\t-m <path>: load a cppsp module (path is relative to root)\n"
  204. "\t-r <root>: set root directory (must be absolute) (default: $(pwd))\n"
  205. "\t-t <threads>: # of worker processes/threads to start up (default: sysconf(_SC_NPROCESSORS_CONF))\n"
  206. "\t-f: use multi-processing (forking) instead of multi-threading (pthreads)\n"
  207. "\t-a: automatically set cpu affinity for the created worker threads/processes\n",argv[0]);
  208. exit(1);
  209. }
  210. });
  211. } catch(exception& ex) {
  212. printerr("error: %s\nspecify -? for help",ex.what());
  213. return 1;
  214. }
  215. printinfo("specify -? for help");
  216. auto i=listen.find(':');
  217. if(i==string::npos) throw runtime_error("expected \":\" in listen");
  218. int cpus=(int)sysconf(_SC_NPROCESSORS_CONF);
  219. if(threads<0)threads=cpus;
  220. if(setAffinity) {
  221. if(threads > cpus && (threads%(int)sysconf(_SC_NPROCESSORS_CONF) != 0)) {
  222. printerr("warning: cpu affinity is to be set; thread count larger than and not divisible by cpu count");
  223. }
  224. }
  225. EndPoint* ep=NULL;
  226. struct {
  227. bool& reusePort;
  228. void operator()(int s) {
  229. int optval = 1;
  230. reusePort=(setsockopt(s, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval))==0);
  231. }
  232. } initsock {reusePort};
  233. if(reusePort)
  234. listensock.bind(listen.substr(0,i).c_str(),
  235. listen.substr(i + 1, listen.length() - i - 1).c_str(), AF_UNSPEC, SOCK_STREAM,0,0,&initsock);
  236. else
  237. listensock.bind(listen.substr(0,i).c_str(),
  238. listen.substr(i + 1, listen.length() - i - 1).c_str(), AF_UNSPEC, SOCK_STREAM);
  239. if(reusePort) {
  240. printinfo("using SO_REUSEPORT");
  241. ep=listensock.getLocalEndPoint();
  242. } else {
  243. printerr("NOT using SO_REUSEPORT");
  244. listensock.listen(CPPSP_LISTEN_BACKLOG);
  245. }
  246. //p.add(listensock);
  247. PRINTSIZE(CP::Socket);
  248. PRINTSIZE(cppspServer::handler);
  249. PRINTSIZE(handler1);
  250. if(f0rk) printinfo("starting %i processes",threads);
  251. else printinfo("starting %i threads",threads);
  252. workerThread* th=(workerThread*)new char[sizeof(workerThread)*threads];
  253. for(int i=0;i<threads;i++) {
  254. int cpu=i%cpus;
  255. Socket* newsock;
  256. if(reusePort) {
  257. newsock=new Socket(listensock.addressFamily, listensock.type, listensock.protocol);
  258. int optval = 1;
  259. assert(setsockopt(newsock->handle, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval))==0);
  260. newsock->bind(*ep);
  261. newsock->listen(CPPSP_LISTEN_BACKLOG);
  262. } else {
  263. newsock=new Socket(f0rk ? listensock.handle : dup(listensock.handle),
  264. listensock.addressFamily, listensock.type, listensock.protocol);
  265. }
  266. workerThread& tmp=*(new (th+i) workerThread(*newsock));
  267. tmp.cpu=(setAffinity?cpu:-1);
  268. newsock->release();
  269. CXXOpts(tmp.srv.mgr)=cxxopts;
  270. tmp.modules=modules;
  271. tmp.threadid=i+1;
  272. if(threads==1) {
  273. thread1(&tmp);
  274. return 0;
  275. }
  276. if(f0rk) {
  277. pid_t pid=fork();
  278. if(pid==0) {
  279. tmp.pid=getpid();
  280. srand(int(tmp.pid)^(int)time(NULL));
  281. thread1(&tmp);
  282. return 0;
  283. } else if(pid>0) {
  284. tmp.pid=pid;
  285. //delete newsock;
  286. } else {
  287. perror("fork");
  288. return 1;
  289. }
  290. } else {
  291. if (pthread_create(&tmp.thread, NULL, thread1, &tmp) != 0) {
  292. throw runtime_error(strerror(errno));
  293. }
  294. }
  295. }
  296. if(f0rk) {
  297. static workerThread* _threads;
  298. static int _threadcount;
  299. struct sig_handler
  300. {
  301. static void a(int sig) {
  302. for(int i=0;i<_threadcount;i++) {
  303. kill(_threads[i].pid, 9);
  304. }
  305. exit(0);
  306. }
  307. };
  308. _threads=th;
  309. _threadcount=threads;
  310. struct sigaction sa;
  311. sa.sa_handler = &sig_handler::a;
  312. sigemptyset(&sa.sa_mask);
  313. sigaction(SIGINT, &sa, NULL);
  314. sigaction(SIGTERM, &sa, NULL);
  315. sigaction(SIGSEGV, &sa, NULL);
  316. }
  317. while(1)sleep(3600);
  318. }
  319. void parseArgs(int argc, char** argv, const function<void(char*, const function<char*()>&)>& cb) {
  320. int i = 1;
  321. function<char*()> func = [&]()->char*
  322. {
  323. if(i+1>=argc)throw logic_error(string(argv[i])+" requires an argument");
  324. return argv[(++i)];
  325. };
  326. for (; i < argc; i++) {
  327. if (argv[i][0] == '\x00') continue;
  328. if (argv[i][0] == '-') {
  329. cb(argv[i] + 1, func);
  330. } else {
  331. cb(NULL, [argv,i]()
  332. { return argv[i];});
  333. }
  334. }
  335. }