socketd.C 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799
  1. /*
  2. * config.cpp
  3. *
  4. * Created on: 2011-05-20
  5. * Author: xaxaxa
  6. *
  7. *
  8. */
  9. /*
  10. This program is free software: you can redistribute it and/or modify
  11. it under the terms of the GNU General Public License as published by
  12. the Free Software Foundation, either version 3 of the License, or
  13. (at your option) any later version.
  14. This program is distributed in the hope that it will be useful,
  15. but WITHOUT ANY WARRANTY; without even the implied warranty of
  16. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  17. GNU General Public License for more details.
  18. You should have received a copy of the GNU General Public License
  19. along with this program. If not, see <http://www.gnu.org/licenses/>.
  20. * */
  21. #define _ISOC11_SOURCE
  22. #include "include/socketd_internal.H"
  23. #include <cpoll/sendfd.H>
  24. #include <cpoll/statemachines.H>
  25. #include <stdexcept>
  26. #include <unistd.h>
  27. #include <stdint.h>
  28. #include <rgc.H>
  29. #include <signal.h>
  30. #include <tuple>
  31. #include <ctype.h>
  32. #include <delegate.H>
  33. #include <set>
  34. #include <stdlib.h>
  35. #include <math.h>
  36. #define PRINTSIZE(x) printf("sizeof("#x") = %i\n",sizeof(x))
  37. #define SOCKETD_READBUFFER 256
  38. //maximum length of user-controlled data on the stack (for example, http header names)
  39. //in order to prevent stack overflow attacks
  40. #define SOCKETD_STACKBUFFER 256
  41. using namespace std;
  42. using namespace CP;
  43. //8: debug; 5: info; 3: warn; 2: err; 1: fatal
  44. #define SOCKETD_DEBUG(LEVEL, ...) if(LEVEL<=5)printf(__VA_ARGS__)
  45. //#define SOCKETD_DEBUG(...) /* __VA_ARGS__ */
  46. namespace socketd
  47. {
  48. static int asdf = 0;
  49. //static const int rBufSize = 4096;
  50. //static const int rLineBufSize = 512;
  51. void spawnApp(vhost* vh, CP::Poll& p, string exepath, int threadID, int i);
  52. bool comparePath(const char* conf, int confLen, const char* path, int pathLen) {
  53. SOCKETD_DEBUG(10, "comparePath: conf=%s; path=%s\n", string(conf, confLen).c_str(),
  54. string(path, pathLen).c_str());
  55. //cout << string(path, pathLen) << endl;
  56. if (confLen == pathLen && memcmp(conf, path, confLen) == 0) {
  57. /*cout << "matched (exact): " << string(path, pathLen) << " against " << string(conf, confLen)
  58. << endl;*/
  59. return true;
  60. }
  61. if (pathLen > confLen && memmem(path, pathLen, conf, confLen) == path
  62. && (confLen == 1 || path[confLen] == '/')) {
  63. /*cout << "matched (substring): " << string(path, pathLen) << " against " << string(conf, confLen)
  64. << endl;*/
  65. return true;
  66. }
  67. //
  68. return false;
  69. }
  70. bool compareHost(const char* conf, int confLen, const char* host, int hostLen) {
  71. if (confLen == hostLen && memcmp(conf, host, confLen) == 0) {
  72. return true;
  73. }
  74. if (confLen >= 1 && conf[0] == '*') {
  75. if (((const char*) memmem(host, hostLen, conf + 1, confLen - 1)) - host
  76. == (hostLen - (confLen - 1))) {
  77. return true;
  78. }
  79. }
  80. return false;
  81. }
  82. SocketDException::SocketDException() :
  83. message(strerror(errno)), number(errno) {
  84. }
  85. SocketDException::SocketDException(int32_t number) :
  86. message(strerror(number)), number(number) {
  87. }
  88. SocketDException::SocketDException(string message, int32_t number) :
  89. message(message), number(number) {
  90. }
  91. SocketDException::~SocketDException() throw () {
  92. }
  93. const char* SocketDException::what() const throw () {
  94. return message.c_str();
  95. }
  96. char* strupper(char* s, int len) {
  97. for (char* p = s; *p; ++p)
  98. *p = toupper(*p);
  99. return s;
  100. }
  101. char* strlower(char* s, int len) {
  102. for (char* p = s; *p; ++p)
  103. *p = tolower(*p);
  104. return s;
  105. }
  106. bool compareStringCI(const char* s1, const char* s2, int l) {
  107. for (int i = 0; i < l; i++) {
  108. if (tolower(s1[i]) != tolower(s2[i])) return false;
  109. }
  110. return true;
  111. }
  112. bool compareStringCI(const char* s1, int l1, const char* s2) {
  113. int l2 = strlen(s2);
  114. if (l1 != l2) return false;
  115. return compareStringCI(s1, s2, l1);
  116. }
  117. int& getCurProcess(vhost* vh, socketd* This, int threadID) {
  118. uint8_t* data = vh->perCPUData[threadID];
  119. uint8_t* tmp = data + sizeof(appConnection*) * vh->_processes;
  120. return *(int*) tmp;
  121. }
  122. appConnection* getConn(vhost* vh, int threadID, int i) {
  123. uint8_t* data = vh->perCPUData[threadID];
  124. uint8_t* tmp = data + sizeof(appConnection*) * i;
  125. return *(appConnection**) tmp;
  126. }
  127. void setConn(vhost* vh, int threadID, int i, appConnection* c) {
  128. uint8_t* data = vh->perCPUData[threadID];
  129. uint8_t* tmp = data + sizeof(appConnection*) * i;
  130. appConnection*& conn = *(appConnection**) tmp;
  131. if (conn != NULL) conn->release();
  132. conn = c;
  133. if (conn != NULL) conn->retain();
  134. }
  135. struct connectionInfo
  136. {
  137. socketd* This;
  138. listen* l;
  139. CP::Socket s;
  140. CP::Poll* p;
  141. vhost* tmp_vh;
  142. appConnection* tmpptr;
  143. bool* deletionFlag;
  144. //CP::streamReader* sr;
  145. char _sr[sizeof(CP::newPersistentStreamReader)];
  146. //int lineBufLen;
  147. const char* httpPath;
  148. const char* httpHost;
  149. int httpPathLength;
  150. int httpHostLength;
  151. int tries;
  152. //0: none; 1: reqLine; 2: headers
  153. int readTo;
  154. int pos;
  155. int threadID;
  156. int processIndex;
  157. bool firstLine;
  158. bool reading;
  159. bool cancelread;
  160. bool shouldDelete;
  161. bool streamReaderInit;
  162. int& getCurProcess(vhost* vh) {
  163. return ::socketd::getCurProcess(vh, This, threadID);
  164. }
  165. appConnection* getConn(vhost* vh, int i) {
  166. uint8_t* data = vh->perCPUData[threadID];
  167. uint8_t* tmp = data + sizeof(appConnection*) * i;
  168. return *(appConnection**) tmp;
  169. }
  170. void setConn(vhost* vh, int i, appConnection* c) {
  171. uint8_t* data = vh->perCPUData[threadID];
  172. uint8_t* tmp = data + sizeof(appConnection*) * i;
  173. appConnection*& conn = *(appConnection**) tmp;
  174. if (conn != NULL) conn->release();
  175. conn = c;
  176. if (conn != NULL) conn->retain();
  177. }
  178. connectionInfo(int fd, int d, int t, int p) :
  179. s(fd, d, t, p), deletionFlag(NULL), tries(0), readTo(0), pos(0), processIndex(-1),
  180. shouldDelete(false), streamReaderInit(false) {
  181. }
  182. void startRead();
  183. void checkMatch();
  184. void startSocketRead();
  185. void socketReadCB(int r) {
  186. SOCKETD_DEBUG(9, "got %i bytes of data from client socket\n", r);
  187. CP::newPersistentStreamReader* sr = (CP::newPersistentStreamReader*) _sr;
  188. if (r > 0) {
  189. bool d(false);
  190. deletionFlag = &d;
  191. sr->endPutData(r);
  192. newPersistentStreamReader::item it;
  193. if (sr->process(it)) readCB((uint8_t*) it.data.data(), it.data.length());
  194. if (d) return;
  195. deletionFlag = NULL;
  196. }
  197. reading = false;
  198. if (shouldDelete) {
  199. delete this;
  200. return;
  201. }
  202. if (cancelread) return;
  203. if (r <= 0) {
  204. String s = sr->getBufferData();
  205. sr->clearBuffer();
  206. readCB((uint8_t*) s.data(), s.length());
  207. return;
  208. }
  209. startSocketRead();
  210. }
  211. void processLine(uint8_t* buf, int len) {
  212. //uint8_t* lineBuf = ((uint8_t*) sr) + CP::streamReader_getSize() + rBufSize;
  213. uint8_t* lineBuf = buf;
  214. int lineBufLen = len;
  215. SOCKETD_DEBUG(10, "got line: %s\n", string((const char* )lineBuf, lineBufLen).c_str());
  216. //printf("got line: ");
  217. //fflush(stdout);
  218. //write(1, lineBuf, lineBufLen);
  219. //printf("\n");
  220. if (len <= 0) goto fail;
  221. if (firstLine) {
  222. firstLine = false;
  223. uint8_t* tmp = (uint8_t*) memchr(lineBuf, ' ', lineBufLen);
  224. if (tmp == NULL) goto fail;
  225. tmp++;
  226. if (lineBuf + lineBufLen - tmp <= 0) goto fail;
  227. uint8_t* tmp1 = (uint8_t*) memchr(tmp, ' ', lineBuf + lineBufLen - tmp);
  228. if (tmp1 == NULL) goto fail;
  229. const char* path = (const char*) tmp;
  230. int pathLen = tmp1 - tmp;
  231. if (pathLen <= 0) goto fail;
  232. pos = 1;
  233. httpPath = path;
  234. httpPathLength = pathLen;
  235. SOCKETD_DEBUG(10, "got httpPath: %s\n", string(httpPath, httpPathLength).c_str());
  236. checkMatch();
  237. return;
  238. }
  239. const uint8_t* tmp;
  240. const uint8_t* end;
  241. end = buf + len;
  242. tmp = (const uint8_t*) memchr(buf, ':', len);
  243. if (tmp == NULL) goto cont;
  244. int i;
  245. i = tmp - buf;
  246. if (i > SOCKETD_STACKBUFFER) goto fail;
  247. if (compareStringCI((const char*) buf, i, "host")) {
  248. tmp++;
  249. while (tmp < end && *tmp == ' ')
  250. tmp++;
  251. if (tmp >= end) goto fail;
  252. httpHost = (const char*) tmp;
  253. httpHostLength = end - tmp;
  254. SOCKETD_DEBUG(10, "got httpHost: %s\n", string(httpHost, httpHostLength).c_str());
  255. pos = 2;
  256. checkMatch();
  257. return;
  258. }
  259. cont: startRead();
  260. return;
  261. fail: delete this;
  262. }
  263. void readCB(uint8_t* buf, int len) {
  264. /*uint8_t* lineBuf;
  265. if (len <= 0) goto aaa;
  266. //overflowed the line buffer
  267. if (lineBufLen + len > rLineBufSize) goto fail;
  268. lineBuf = ((uint8_t*) sr) + CP::streamReader_getSize() + rBufSize;
  269. memcpy(lineBuf + lineBufLen, buf, len);
  270. lineBufLen += len;
  271. aaa: if (last) {
  272. cancelread = true;
  273. processLine();
  274. }
  275. return;
  276. fail: delete this;*/
  277. cancelread = true;
  278. processLine(buf, len);
  279. }
  280. inline int connIndex(vhost* vh) {
  281. return threadID * vh->_processes + processIndex;
  282. }
  283. void attachmentCB(bool b) {
  284. if (b) {
  285. SOCKETD_DEBUG(8, "received acknownedgement for connection %p (with attachment)\n",
  286. this);
  287. delete this;
  288. } else {
  289. do_transfer(tmp_vh);
  290. }
  291. }
  292. void appCB(bool b) {
  293. if (b) {
  294. SOCKETD_DEBUG(8, "received acknownedgement for connection %p\n", this);
  295. delete this;
  296. } else {
  297. if (tmpptr == getConn(tmp_vh, processIndex)) {
  298. getConn(tmp_vh, processIndex)->shutDown();
  299. setConn(tmp_vh, processIndex, NULL);
  300. }
  301. do_transfer(tmp_vh);
  302. }
  303. }
  304. //transfer socket to application
  305. void do_transfer(vhost* vh) {
  306. //cout << "do_transfer" << endl;
  307. SOCKETD_DEBUG(8, "do_transfer (%p)\n", this);
  308. retry: if ((++tries) > 3) {
  309. SOCKETD_DEBUG(3, "exceeded 3 tries for connection %p\n", this);
  310. if (reading) shouldDelete = true;
  311. else delete this;
  312. return;
  313. }
  314. if (processIndex < 0) {
  315. processIndex = (getCurProcess(vh)++) % vh->_processes;
  316. }
  317. if (getConn(vh, processIndex) == NULL && vh->exepath.length() > 0) {
  318. spawnApp(vh, *p, vh->exepath, threadID, processIndex);
  319. }
  320. uint8_t* buf;
  321. int bufLen;
  322. if (streamReaderInit) {
  323. CP::newPersistentStreamReader* sr = (CP::newPersistentStreamReader*) _sr;
  324. String s = sr->getHistory();
  325. buf = (uint8_t*) s.data();
  326. bufLen = s.length();
  327. } else {
  328. buf = NULL;
  329. bufLen = 0;
  330. }
  331. if (vh->attachmentConn() != NULL) {
  332. tmp_vh = vh;
  333. int r = vh->attachmentConn->passConnection(&s, NULL, 0,
  334. appConnection::passConnCB(&connectionInfo::attachmentCB, this));
  335. if (r == 1) { //fail
  336. goto aaaaa;
  337. } else if (r == 0) { //success
  338. SOCKETD_DEBUG(8, "connection %p pre-succeeded (with attachment)\n", this);
  339. delete this;
  340. return;
  341. } else return;
  342. }
  343. aaaaa: if (getConn(vh, processIndex) != NULL) {
  344. //cout << "vh->conn() != NULL" << endl;
  345. appConnection* tmpptr = getConn(vh, processIndex);
  346. this->tmp_vh = vh;
  347. this->tmpptr = tmpptr;
  348. SOCKETD_DEBUG(8, "bufLen=%i\n", bufLen);
  349. int r = tmpptr->passConnection(&s, buf, bufLen,
  350. appConnection::passConnCB(&connectionInfo::appCB, this));
  351. if (r == 1) {
  352. //application possibly dead; respawn
  353. tmpptr->shutDown();
  354. if (tmpptr == getConn(vh, processIndex)) setConn(vh, processIndex, NULL);
  355. goto retry;
  356. } else if (r == 0) {
  357. SOCKETD_DEBUG(8, "connection %p pre-succeeded\n", this);
  358. delete this;
  359. return;
  360. } else return;
  361. } else {
  362. //no handler found; reset connection
  363. SOCKETD_DEBUG(5, "no handler for connection %p\n", this);
  364. delete this;
  365. }
  366. }
  367. void process() {
  368. SOCKETD_DEBUG(9, "connectionInfo::process()\n");
  369. checkMatch();
  370. }
  371. ~connectionInfo() {
  372. SOCKETD_DEBUG(9, "~connectionInfo (%p)\n", this);
  373. //s.release();
  374. if (streamReaderInit) {
  375. CP::newPersistentStreamReader* sr = (CP::newPersistentStreamReader*) _sr;
  376. sr->~newPersistentStreamReader();
  377. }
  378. if (deletionFlag != NULL) *deletionFlag = true;
  379. }
  380. };
  381. void connectionInfo::startSocketRead() {
  382. if (reading) return;
  383. CP::newPersistentStreamReader* sr = (CP::newPersistentStreamReader*) _sr;
  384. auto tmp = sr->beginPutData(SOCKETD_READBUFFER);
  385. SOCKETD_DEBUG(9, "attempting to read %i bytes of data from client socket\n",
  386. SOCKETD_READBUFFER);
  387. reading = true;
  388. s.read(tmp.data(), SOCKETD_READBUFFER, CP::Callback(&connectionInfo::socketReadCB, this));
  389. }
  390. void connectionInfo::checkMatch() {
  391. //figure out what needs to be read to decide which binding to use
  392. //0: none; 1: reqLine; 2: headers
  393. //int readTo = 0;
  394. if (pos < readTo) {
  395. startRead();
  396. return;
  397. }
  398. SOCKETD_DEBUG(9, "bindings.size() = %i\n", This->bindings.size());
  399. for (uint32_t i = 0; i < This->bindings.size(); i++) {
  400. SOCKETD_DEBUG(9, "This->bindings[i]->listenID = %i\n", This->bindings[i]->listenID);
  401. if (!(This->bindings[i]->matchLevel & This->bindings[i]->match_listenID)
  402. || This->bindings[i]->listenID == l->id) {
  403. if (This->bindings[i]->matchLevel & binding::match_httpPath) {
  404. if (pos < 1) {
  405. readTo = 1;
  406. break;
  407. } else {
  408. if (comparePath(This->bindings[i]->httpPath.data(),
  409. This->bindings[i]->httpPath.length(), httpPath, httpPathLength)) {
  410. goto matched_httpPath;
  411. } else continue;
  412. }
  413. } else {
  414. matched_httpPath: if (This->bindings[i]->matchLevel & binding::match_httpHost) {
  415. if (pos < 2) {
  416. readTo = 2;
  417. break;
  418. } else {
  419. if (comparePath(This->bindings[i]->httpHost.data(),
  420. This->bindings[i]->httpHost.length(), httpHost, httpHostLength)) {
  421. goto matched_httpHost;
  422. } else continue;
  423. }
  424. } else {
  425. matched_httpHost: do_transfer(This->bindings[i]->vh);
  426. return;
  427. }
  428. }
  429. }
  430. }
  431. SOCKETD_DEBUG(9, "readTo=%i pos=%i\n", readTo, pos);
  432. if (readTo > pos) {
  433. if (pos == 0) {
  434. //initialize streamReader
  435. CP::newPersistentStreamReader* sr;
  436. sr = new (_sr) CP::newPersistentStreamReader(SOCKETD_READBUFFER);
  437. streamReaderInit = true;
  438. //if (sr == NULL) goto fail;
  439. //CP::streamReader_init(sr, rBufSize);
  440. firstLine = true;
  441. reading = false;
  442. p->add(s);
  443. }
  444. startRead();
  445. } else goto fail;
  446. return;
  447. fail: if (reading) shouldDelete = true;
  448. else delete this;
  449. }
  450. void connectionInfo::startRead() {
  451. CP::newPersistentStreamReader* sr = (CP::newPersistentStreamReader*) _sr;
  452. sr->readUntilString("\r\n", 2);
  453. newPersistentStreamReader::item it;
  454. if (sr->process(it)) readCB((uint8_t*) it.data.data(), it.data.length());
  455. else {
  456. cancelread = false;
  457. startSocketRead();
  458. }
  459. }
  460. appConnection::appConnection() {
  461. }
  462. appConnection::~appConnection() {
  463. //if(vh!=NULL && vh->conn==this) vh->conn=NULL;
  464. }
  465. struct appConnection_unix: public appConnection
  466. {
  467. RGC::Ref<CP::Socket> unixsock;
  468. //not yet acknowledged
  469. map<int64_t, passConnCB> pendingConnections;
  470. int64_t maxID;
  471. protocolHeader buf;
  472. prot_ackConnection buf1;
  473. CP::Poll& p;
  474. socketd* sd;
  475. set<vhost*> bound_vhosts;
  476. pid_t pid;
  477. //char sbuf[sizeof(protocolHeader)+sizeof(prot_handleConnection)];
  478. bool dead;
  479. bool down;
  480. virtual void shutDown() {
  481. if (!down) {
  482. down = true;
  483. unixsock->close();
  484. kill(pid, 15);
  485. }
  486. }
  487. void die(int64_t ignoreID) {
  488. if (dead) return;
  489. //throw 5;
  490. dead = true;
  491. for (auto it = pendingConnections.begin(); it != pendingConnections.end(); it++) {
  492. if ((*it).first != ignoreID) (*it).second(false);
  493. }
  494. pendingConnections.clear();
  495. shutDown();
  496. }
  497. void startRead();
  498. void ackConnectionCB(int r) {
  499. if (r <= 0) {
  500. die(0);
  501. }
  502. if (dead) {
  503. release();
  504. return;
  505. }
  506. //printf("%i\n",buf1.id);
  507. auto it = pendingConnections.find(buf1.id);
  508. if (it != pendingConnections.end()) {
  509. (*it).second(buf1.success);
  510. pendingConnections.erase(it);
  511. }
  512. startRead();
  513. }
  514. void readCB(int r) {
  515. if (r <= 0) {
  516. SOCKETD_DEBUG(5, "application died; r=%i; errno: %s\n", r, strerror(errno));
  517. die(0);
  518. }
  519. if (dead) {
  520. release();
  521. return;
  522. }
  523. switch (buf.type) {
  524. case protocolHeader::ackConnection:
  525. {
  526. unixsock->read(&buf1, sizeof(buf1),
  527. CP::Callback(&appConnection_unix::ackConnectionCB, this));
  528. break;
  529. }
  530. default:
  531. startRead();
  532. break;
  533. }
  534. }
  535. appConnection_unix(vhost* vh, CP::Poll& p, string exepath) :
  536. maxID(0), p(p), pid(0), dead(false), down(false) {
  537. int socks[2];
  538. if (socketpair(AF_UNIX, SOCK_STREAM, 0, socks) < 0) {
  539. throw runtime_error(strerror(errno));
  540. }
  541. if (vh->_ipcBufSize > 0) {
  542. int n;
  543. unsigned int n_size = sizeof(n);
  544. n = vh->_ipcBufSize;
  545. setsockopt(socks[0], SOL_SOCKET, SO_RCVBUF, (void *) &n, n_size);
  546. setsockopt(socks[0], SOL_SOCKET, SO_SNDBUF, (void *) &n, n_size);
  547. setsockopt(socks[1], SOL_SOCKET, SO_RCVBUF, (void *) &n, n_size);
  548. setsockopt(socks[1], SOL_SOCKET, SO_SNDBUF, (void *) &n, n_size);
  549. }
  550. pid_t pid = fork();
  551. if (pid < 0) throw runtime_error(strerror(errno));
  552. else if (pid == 0) {
  553. //child
  554. close(socks[0]);
  555. if (socks[1] != 3) {
  556. dup2(socks[1], 3); //fd 3
  557. close(socks[1]);
  558. }
  559. setenv("SOCKETD_FD", "3", 1);
  560. if (vh->preload) {
  561. setenv("LD_PRELOAD", socketd_proxy, 1);
  562. }
  563. if (vh->useShell) execlp("/bin/sh", "/bin/sh", "-c", exepath.c_str(),
  564. (const char*) NULL);
  565. else execlp(exepath.c_str(), exepath.c_str(), (const char*) NULL);
  566. _exit(1);
  567. } else {
  568. //parent
  569. close(socks[1]);
  570. this->pid = pid;
  571. //getsockopt(socks[0], SOL_SOCKET, SO_RCVBUF, (void *) &n, &n_size);
  572. //SOCKETD_DEBUG(8, "unix socket receive buffer size: %i\n", n);
  573. unixsock = RGC::newObj<CP::Socket>(socks[0], AF_UNIX, SOCK_STREAM, 0);
  574. p.add(*unixsock);
  575. //printf("asdfg %i\n", ++asdf);
  576. retain();
  577. startRead();
  578. }
  579. }
  580. appConnection_unix(CP::Socket* sock, CP::Poll& p, socketd* sd) :
  581. maxID(0), p(p), sd(sd), dead(false), down(false) {
  582. }
  583. virtual int passConnection(CP::Socket* s, void* buffer, int buflen, const passConnCB& cb) {
  584. if (dead) return 1;
  585. //printf("passConnection\n");
  586. //s->retain();
  587. int64_t id = (++maxID);
  588. int r;
  589. {
  590. int len = sizeof(protocolHeader) + sizeof(prot_handleConnection);
  591. uint8_t* hdr[len];
  592. memset(hdr, 0, len);
  593. protocolHeader* ph = new (hdr) protocolHeader();
  594. prot_handleConnection* ph1 = new (ph + 1) prot_handleConnection();
  595. ph->type = protocolHeader::handleConnection;
  596. //printf("zxcv %lli\n",id);
  597. ph1->id = id;
  598. ph1->d = s->addressFamily;
  599. ph1->t = s->type;
  600. ph1->p = s->protocol;
  601. ph1->bufferLen = buflen;
  602. //socket has SOCK_NONBLOCK set, so regular send() won't block;
  603. //if the socket buffer is full, then the application is already
  604. //considered dead
  605. r = unixsock->sendAll(hdr, len, MSG_DONTWAIT);
  606. ph->~protocolHeader();
  607. ph1->~prot_handleConnection();
  608. }
  609. if (r <= 0) goto fail;
  610. if (sendfd(unixsock->handle, s->handle) < 0) goto fail;
  611. if (buflen > 0) if (unixsock->sendAll(buffer, buflen, MSG_DONTWAIT) <= 0) goto fail;
  612. //s->release();
  613. pendingConnections.insert( { id, cb });
  614. return 2;
  615. fail: //s->release();
  616. SOCKETD_DEBUG(1, "unix socket buffer overflow; %s\n", strerror(errno));
  617. die(id);
  618. return 1;
  619. }
  620. virtual ~appConnection_unix() {
  621. }
  622. };
  623. void appConnection_unix::startRead() {
  624. unixsock->read(&buf, sizeof(buf), CP::Callback(&appConnection_unix::readCB, this));
  625. }
  626. void spawnApp(vhost* vh, CP::Poll& p, string exepath, int threadID, int i) {
  627. setConn(vh, threadID, i, RGC::newObj<appConnection_unix>(vh, p, exepath));
  628. }
  629. struct socketd_execinfo;
  630. struct socketd_thread
  631. {
  632. socketd* This;
  633. socketd_execinfo* execinfo;
  634. pthread_t thr;
  635. int id;
  636. };
  637. struct socketd_execinfo
  638. {
  639. vector<socketd_thread> threads;
  640. };
  641. void* socketd_processorThread(void* v) {
  642. socketd_thread* th = (socketd_thread*) v;
  643. CP::Poll p;
  644. for (uint32_t i = 0; i < th->This->listens.size(); i++) {
  645. auto& l = th->This->listens[i];
  646. struct cb1
  647. {
  648. CP::Poll& poll;
  649. socketd_thread* th;
  650. listen& l;
  651. Socket s;
  652. cb1(Poll& poll, socketd_thread* th, listen& l) :
  653. poll(poll), th(th), l(l), s(l.socks[th->id], l.d, l.t, l.p) {
  654. s.repeatAcceptHandle(this);
  655. poll.add(s);
  656. }
  657. void operator()(HANDLE h) {
  658. connectionInfo* ci = new connectionInfo(h, l.d, l.t, l.p);
  659. ci->threadID = th->id;
  660. ci->This = th->This;
  661. ci->l = &l;
  662. SOCKETD_DEBUG(9, "req.l.id = %i\n", l.id);
  663. ci->p = &poll;
  664. ci->process();
  665. }
  666. }* cb = new cb1(p, th, l);
  667. }
  668. p.loop();
  669. printf("%i exited\n", th->id);
  670. return NULL;
  671. }
  672. void socketd::run() {
  673. PRINTSIZE(CP::Socket);
  674. PRINTSIZE(CP::newPersistentStreamReader);
  675. PRINTSIZE(connectionInfo);
  676. //ignore SIGCHLD
  677. struct sigaction sa;
  678. sa.sa_handler = SIG_IGN;
  679. sigemptyset(&sa.sa_mask);
  680. sa.sa_flags = SA_RESTART; /* Restart system calls if
  681. interrupted by handler */
  682. sigaction(SIGCHLD, &sa, NULL);
  683. CP::Poll p;
  684. //p.debug=true;
  685. this->bindings.clear();
  686. for (int i = 0; i < (int) vhosts.size(); i++) {
  687. vhosts[i]._processes = ceil(double(vhosts[i].processes) / threads);
  688. if (vhosts[i]._processes < 1) vhosts[i]._processes = 1;
  689. }
  690. perCPUData = new uint8_t*[threads];
  691. for (int i = 0; i < threads; i++) {
  692. int s = 0;
  693. for (int ii = 0; ii < (int) vhosts.size(); ii++) {
  694. s += sizeof(appConnection*) * vhosts[ii]._processes;
  695. s += sizeof(int) * 2;
  696. }
  697. int align = 64;
  698. if (s % align != 0) s = ((s / align) + 1) * align;
  699. uint8_t* tmp; // = aligned_alloc(64, s);
  700. if (posix_memalign((void**) &tmp, align, s) != 0) throw bad_alloc();
  701. memset(tmp, 0, s);
  702. s = 0;
  703. for (int ii = 0; ii < (int) vhosts.size(); ii++) {
  704. vhosts[ii].perCPUData.push_back(tmp + s);
  705. s += sizeof(appConnection*) * vhosts[ii]._processes;
  706. s += sizeof(int) * 2;
  707. }
  708. }
  709. for (uint32_t i = 0; i < vhosts.size(); i++) {
  710. for (uint32_t ii = 0; ii < vhosts[i].bindings.size(); ii++) {
  711. binding* tmp = &(vhosts[i].bindings[ii]);
  712. this->bindings.push_back(tmp);
  713. vhosts[i].bindings[ii].vh = &vhosts[i];
  714. }
  715. vhosts[i]._ipcBufSize = vhosts[i].ipcBufSize < 0 ? this->ipcBufSize : vhosts[i].ipcBufSize;
  716. vhosts[i].hasAttachments = false;
  717. //vhosts[i].conns.resize(nthreads * vhosts[i].processes);
  718. //vhosts[i].curProcess.resize(nthreads);
  719. vhosts[i].perCPUData.resize(threads);
  720. }
  721. for (uint32_t i = 0; i < extraBindings.size(); i++) {
  722. for (uint32_t ii = 0; ii < vhosts.size(); ii++) {
  723. if (vhosts[ii].name == extraBindings[i].vhostName) {
  724. extraBindings[i].vh = &vhosts[ii];
  725. break;
  726. }
  727. }
  728. binding* tmp = &(extraBindings[i]);
  729. this->bindings.push_back(tmp);
  730. }
  731. //start up unix listening socket
  732. /*if (unixAddress.length() > 0) {
  733. CP::Socket* unixsock = new CP::Socket(AF_UNIX, SOCK_STREAM);
  734. unixsock->bind(CP::UNIXEndPoint(unixAddress));
  735. unixsock->listen(8);
  736. unixsock->repeatAccept([](CP::Socket* s) {
  737. });
  738. }*/
  739. SOCKETD_DEBUG(9, "bindings.size() = %i\n", bindings.size());
  740. for (uint32_t i = 0; i < listens.size(); i++) {
  741. auto& l = listens[i];
  742. Socket tmp;
  743. tmp.bind(l.host.c_str(), l.port.c_str(), AF_UNSPEC, SOCK_STREAM);
  744. tmp.listen(l.backlog);
  745. l.d = tmp.addressFamily;
  746. l.t = tmp.type;
  747. l.p = tmp.protocol;
  748. l.socks.resize(threads);
  749. for (int ii = 0; ii < threads; ii++) {
  750. l.socks[ii] = dup(tmp.handle);
  751. }
  752. }
  753. socketd_execinfo execinfo;
  754. printf("this=%p\n", this);
  755. execinfo.threads.resize(threads);
  756. SOCKETD_DEBUG(3, "starting %i threads\n", threads);
  757. for (int i = 0; i < threads; i++) {
  758. socketd_thread& th = execinfo.threads[i];
  759. th.This = this;
  760. th.execinfo = &execinfo;
  761. th.id = i;
  762. //printf("%p %p\n",&th, &th1);
  763. if (pthread_create(&th.thr, NULL, socketd_processorThread, &th) != 0) {
  764. throw runtime_error(strerror(errno));
  765. }
  766. }
  767. while (true)
  768. sleep(3600);
  769. }
  770. }