cpoll.C 74 KB


  1. /*
  2. * cpoll.C
  3. *
  4. * Created on: 2012-09-14
  5. * Author: xaxaxa
  6. */
  7. /*
  8. This program is free software: you can redistribute it and/or modify
  9. it under the terms of the GNU General Public License as published by
  10. the Free Software Foundation, either version 3 of the License, or
  11. (at your option) any later version.
  12. This program is distributed in the hope that it will be useful,
  13. but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. GNU General Public License for more details.
  16. You should have received a copy of the GNU General Public License
  17. along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. * */
  19. #include "include/cpoll.H"
  20. #include "include/cpoll_internal.H"
  21. #include <unistd.h>
  22. #include <fcntl.h>
  23. #include <stdexcept>
  24. #include "include/statemachines.H"
  25. #include <dirent.h>
  26. #include <sys/socket.h>
  27. #include <netdb.h>
  28. #include <sstream>
  29. #include <sys/timerfd.h>
  30. #include <algorithm>
  31. namespace CP
  32. {
  33. //CPollException
  34. CPollException::CPollException() :
  35. message(strerror(errno)), number(errno) {
  36. }
  37. CPollException::CPollException(int32_t number) :
  38. message(strerror(number)), number(number) {
  39. }
  40. CPollException::CPollException(string message, int32_t number) :
  41. message(message), number(number) {
  42. }
  43. CPollException::~CPollException() throw () {
  44. }
  45. const char* CPollException::what() const throw () {
  46. return message.c_str();
  47. }
  48. AbortException::AbortException() {
  49. }
  50. AbortException::~AbortException() throw () {
  51. }
  52. const char* AbortException::what() const throw () {
  53. return "aborting cpoll loop";
  54. }
  55. CancelException::CancelException() {
  56. }
  57. CancelException::~CancelException() throw () {
  58. }
  59. const char* CancelException::what() const throw () {
  60. return "cancelling current cpoll operation";
  61. }
  62. vector<RGC::Ref<EndPoint> > EndPoint::lookupHost(const char* hostname, const char* port,
  63. int32_t family, int32_t socktype, int32_t proto, int32_t flags) {
  64. vector<RGC::Ref<EndPoint> > tmp;
  65. addrinfo hints, *result, *rp;
  66. memset(&hints, 0, sizeof(struct addrinfo));
  67. hints.ai_family = family; /* Allow IPv4 or IPv6 */
  68. hints.ai_socktype = socktype;
  69. hints.ai_flags = flags;
  70. hints.ai_protocol = proto;
  71. int32_t s = getaddrinfo(hostname, port, &hints, &result);
  72. if (s != 0) {
  73. throw CPollException(gai_strerror(s));
  74. }
  75. for (rp = result; rp != NULL; rp = rp->ai_next) {
  76. EndPoint* ep = fromSockAddr(rp->ai_addr);
  77. tmp.push_back(ep);
  78. ep->release();
  79. }
  80. freeaddrinfo(result);
  81. return tmp;
  82. }
  83. EndPoint* EndPoint::fromSockAddr(const sockaddr* addr) {
  84. switch (addr->sa_family) {
  85. case AF_INET:
  86. return new IPEndPoint(*((sockaddr_in*) addr));
  87. case AF_INET6:
  88. return new IPv6EndPoint(*((sockaddr_in6*) addr));
  89. case AF_UNIX:
  90. return new UNIXEndPoint(*((sockaddr_un*) addr));
  91. default:
  92. return NULL;
  93. }
  94. }
  95. EndPoint* EndPoint::create(int32_t addressFamily) {
  96. switch (addressFamily) {
  97. case AF_INET:
  98. return new IPEndPoint();
  99. case AF_INET6:
  100. return new IPv6EndPoint();
  101. case AF_UNIX:
  102. return new UNIXEndPoint();
  103. default:
  104. return NULL;
  105. }
  106. }
  107. int EndPoint::getSize(int32_t addressFamily) {
  108. switch (addressFamily) {
  109. case AF_INET:
  110. return sizeof(IPEndPoint);
  111. case AF_INET6:
  112. return sizeof(IPv6EndPoint);
  113. case AF_UNIX:
  114. return sizeof(UNIXEndPoint);
  115. default:
  116. return 0;
  117. }
  118. }
  119. EndPoint* EndPoint::construct(void* mem, int32_t addressFamily) {
  120. switch (addressFamily) {
  121. case AF_INET:
  122. return new (mem) IPEndPoint;
  123. case AF_INET6:
  124. return new (mem) IPv6EndPoint;
  125. case AF_UNIX:
  126. return new (mem) UNIXEndPoint;
  127. default:
  128. return NULL;
  129. }
  130. }
  131. //IPEndPoint
  132. IPEndPoint::IPEndPoint() {
  133. this->addressFamily = AF_INET;
  134. }
  135. IPEndPoint::IPEndPoint(IPAddress address, in_port_t port) {
  136. this->addressFamily = AF_INET;
  137. this->address = address;
  138. this->port = port;
  139. }
  140. void IPEndPoint::set_addr(const sockaddr_in& addr) {
  141. this->addressFamily = AF_INET;
  142. this->address = IPAddress(addr.sin_addr);
  143. this->port = ntohs(addr.sin_port);
  144. }
  145. void IPEndPoint::setSockAddr(const sockaddr* addr) {
  146. if (addr->sa_family != AF_INET) throw CPollException(
  147. "attemting to set the address of an IPEndPoint to a sockaddr that is not AF_INET");
  148. set_addr(*(sockaddr_in*) addr);
  149. }
  150. IPEndPoint::IPEndPoint(const sockaddr_in& addr) {
  151. set_addr(addr);
  152. }
  153. void IPEndPoint::getSockAddr(sockaddr* addr) const {
  154. sockaddr_in* addr_in = (sockaddr_in*) addr;
  155. addr_in->sin_family = AF_INET;
  156. addr_in->sin_port = htons(port);
  157. addr_in->sin_addr = address.a;
  158. }
  159. int32_t IPEndPoint::getSockAddrSize() const {
  160. return sizeof(sockaddr_in);
  161. }
  162. void IPEndPoint::clone(EndPoint& to) const {
  163. if (to.addressFamily != addressFamily) throw CPollException(
  164. "attempting to clone an EndPoint to another EndPoint with a different addressFamily");
  165. IPEndPoint& tmp((IPEndPoint&) to);
  166. tmp.address = address;
  167. tmp.port = port;
  168. }
  169. string IPEndPoint::toStr() const {
  170. stringstream s;
  171. s << address.toStr() << ':' << port;
  172. return s.str();
  173. }
  174. //IPv6EndPoint
  175. IPv6EndPoint::IPv6EndPoint() {
  176. this->addressFamily = AF_INET6;
  177. }
  178. IPv6EndPoint::IPv6EndPoint(IPv6Address address, in_port_t port) {
  179. this->addressFamily = AF_INET6;
  180. this->address = address;
  181. this->port = port;
  182. }
  183. void IPv6EndPoint::set_addr(const sockaddr_in6& addr) {
  184. this->addressFamily = AF_INET6;
  185. this->address = IPv6Address(addr.sin6_addr);
  186. this->port = ntohs(addr.sin6_port);
  187. flowInfo = addr.sin6_flowinfo;
  188. scopeID = addr.sin6_scope_id;
  189. }
  190. IPv6EndPoint::IPv6EndPoint(const sockaddr_in6& addr) {
  191. set_addr(addr);
  192. }
  193. void IPv6EndPoint::setSockAddr(const sockaddr* addr) {
  194. if (addr->sa_family != AF_INET6) throw CPollException(
  195. "attemting to set the address of an IPv6EndPoint to a sockaddr that is not AF_INET6");
  196. set_addr(*(sockaddr_in6*) addr);
  197. }
  198. void IPv6EndPoint::getSockAddr(sockaddr* addr) const {
  199. sockaddr_in6* addr_in = (sockaddr_in6*) addr;
  200. addr_in->sin6_family = AF_INET6;
  201. addr_in->sin6_port = htons(port);
  202. addr_in->sin6_addr = address.a;
  203. addr_in->sin6_flowinfo = flowInfo;
  204. addr_in->sin6_scope_id = scopeID;
  205. }
  206. int32_t IPv6EndPoint::getSockAddrSize() const {
  207. return sizeof(sockaddr_in);
  208. }
  209. void IPv6EndPoint::clone(EndPoint& to) const {
  210. if (to.addressFamily != addressFamily) throw CPollException(
  211. "attempting to clone an EndPoint to another EndPoint with a different addressFamily");
  212. IPv6EndPoint& tmp((IPv6EndPoint&) to);
  213. tmp.address = address;
  214. tmp.port = port;
  215. tmp.flowInfo = flowInfo;
  216. tmp.scopeID = scopeID;
  217. }
  218. string IPv6EndPoint::toStr() const {
  219. stringstream s;
  220. s << '[' << address.toStr() << "]:" << port;
  221. return s.str();
  222. }
  223. //UNIXEndPoint
  224. UNIXEndPoint::UNIXEndPoint() {
  225. this->addressFamily = AF_UNIX;
  226. }
  227. UNIXEndPoint::UNIXEndPoint(string name) {
  228. this->addressFamily = AF_UNIX;
  229. this->name = name;
  230. }
  231. void UNIXEndPoint::set_addr(const sockaddr_un& addr) {
  232. this->addressFamily = AF_UNIX;
  233. this->name = addr.sun_path;
  234. }
  235. UNIXEndPoint::UNIXEndPoint(const sockaddr_un& addr) {
  236. set_addr(addr);
  237. }
  238. void UNIXEndPoint::setSockAddr(const sockaddr* addr) {
  239. if (addr->sa_family != AF_UNIX) throw CPollException(
  240. "attemting to set the address of an UNIXEndPoint to a sockaddr that is not AF_UNIX");
  241. set_addr(*(sockaddr_un*) addr);
  242. }
  243. void UNIXEndPoint::getSockAddr(sockaddr* addr) const {
  244. sockaddr_un* a = (sockaddr_un*) addr;
  245. a->sun_family = AF_UNIX;
  246. strncpy(a->sun_path, name.c_str(), name.length());
  247. a->sun_path[name.length()] = '\0';
  248. }
  249. int32_t UNIXEndPoint::getSockAddrSize() const {
  250. return sizeof(sa_family_t) + name.length() + 1;
  251. }
  252. void UNIXEndPoint::clone(EndPoint& to) const {
  253. if (to.addressFamily != addressFamily) throw CPollException(
  254. "attempting to clone an EndPoint to another EndPoint with a different addressFamily");
  255. UNIXEndPoint& tmp((UNIXEndPoint&) to);
  256. tmp.name = name;
  257. }
  258. string UNIXEndPoint::toStr() const {
  259. return name;
  260. //XXX
  261. }
  262. static void Stream_readCB(Stream* This, int i);
  263. static void Stream_beginRead(Stream* This) {
  264. auto& tmp = This->_readToEnd;
  265. auto* out = tmp.out;
  266. if (out->bufferSize - out->bufferPos < tmp.bufSize) out->flushBuffer(tmp.bufSize);
  267. This->read(out->buffer + out->bufferPos, out->bufferSize - out->bufferPos, { &Stream_readCB,
  268. This });
  269. }
  270. static void Stream_readCB(Stream* This, int i) {
  271. auto& tmp = This->_readToEnd;
  272. if (i <= 0) {
  273. tmp.cb(tmp.br);
  274. return;
  275. }
  276. tmp.out->bufferPos += i;
  277. tmp.br += i;
  278. Stream_beginRead(This);
  279. }
  280. static void Stream_readCB1(Stream* This, int i);
  281. static void Stream_beginRead1(Stream* This) {
  282. auto& tmp = This->_readChunked;
  283. auto* out = tmp.out;
  284. int x = (tmp.len - tmp.br) > tmp.bufSize ? tmp.bufSize : (tmp.len - tmp.br);
  285. if (x <= 0) {
  286. tmp.cb(tmp.br);
  287. return;
  288. }
  289. if (out->bufferSize - out->bufferPos < x) out->flushBuffer(x);
  290. This->read(out->buffer + out->bufferPos, x, { &Stream_readCB1, This });
  291. }
  292. static void Stream_readCB1(Stream* This, int i) {
  293. auto& tmp = This->_readChunked;
  294. if (i <= 0) {
  295. tmp.cb(tmp.br);
  296. return;
  297. }
  298. tmp.out->bufferPos += i;
  299. tmp.br += i;
  300. Stream_beginRead1(This);
  301. }
  302. static inline void Stream_beginReadv(Stream* This) {
  303. if (This->_readvAll.i < This->_readvAll.iovcnt) This->readv(
  304. This->_readvAll.iov + This->_readvAll.i, This->_readvAll.iovcnt - This->_readvAll.i, {
  305. &Stream::_readvCB, This });
  306. else {
  307. This->_readvAll.cb(This->_readvAll.br);
  308. }
  309. }
  310. static inline void Stream_beginReadAll(Stream* This) {
  311. if (This->_readAll.i < This->_readAll.len) This->read(This->_readAll.buf + This->_readAll.i,
  312. This->_readAll.len - This->_readAll.i, { &Stream::_readAllCB, This });
  313. else {
  314. This->_readAll.cb(This->_readAll.i);
  315. }
  316. }
  317. void Stream::_readvCB(int r) {
  318. if (r <= 0) {
  319. _readvAll.cb(_readvAll.br);
  320. return;
  321. }
  322. _readvAll.br += r;
  323. while (r > 0 && _readvAll.i < _readvAll.iovcnt) {
  324. if ((int) _readvAll.iov[_readvAll.i].iov_len > r) {
  325. _readvAll.iov[_readvAll.i].iov_base = ((uint8_t*) _readvAll.iov[_readvAll.i].iov_base)
  326. + r;
  327. _readvAll.iov[_readvAll.i].iov_len -= r;
  328. break;
  329. } else {
  330. r -= _readvAll.iov[_readvAll.i].iov_len;
  331. _readvAll.i++;
  332. }
  333. }
  334. Stream_beginReadv(this);
  335. }
  336. void Stream::_readAllCB(int r) {
  337. if (r <= 0) {
  338. _readAll.cb(_readAll.i);
  339. return;
  340. }
  341. _readAll.i += r;
  342. Stream_beginReadAll(this);
  343. }
  344. static inline void Stream_beginWritev(Stream* This) {
  345. if (This->_writevAll.i < This->_writevAll.iovcnt) This->writev(
  346. This->_writevAll.iov + This->_writevAll.i, This->_writevAll.iovcnt - This->_writevAll.i,
  347. { &Stream::_writevCB, This });
  348. else {
  349. This->_writevAll.cb(This->_writevAll.br);
  350. }
  351. }
  352. static inline void Stream_beginWriteAll(Stream* This) {
  353. if (This->_writeAll.i < This->_writeAll.len) This->write(
  354. This->_writeAll.buf + This->_writeAll.i, This->_writeAll.len - This->_writeAll.i, {
  355. &Stream::_writeAllCB, This });
  356. else {
  357. This->_writeAll.cb(This->_writeAll.i);
  358. }
  359. }
  360. void Stream::_writevCB(int r) {
  361. if (r <= 0) {
  362. _writevAll.cb(_writevAll.br);
  363. return;
  364. }
  365. _writevAll.br += r;
  366. while (r > 0 && _writevAll.i < _writevAll.iovcnt) {
  367. if ((int) _writevAll.iov[_writevAll.i].iov_len > r) {
  368. _writevAll.iov[_writevAll.i].iov_base =
  369. ((uint8_t*) _writevAll.iov[_writevAll.i].iov_base) + r;
  370. _writevAll.iov[_writevAll.i].iov_len -= r;
  371. break;
  372. } else {
  373. r -= _writevAll.iov[_writevAll.i].iov_len;
  374. _writevAll.i++;
  375. }
  376. }
  377. Stream_beginWritev(this);
  378. }
  379. void Stream::_writeAllCB(int r) {
  380. if (r <= 0) {
  381. _writeAll.cb(_writeAll.i);
  382. return;
  383. }
  384. _writeAll.i += r;
  385. Stream_beginWriteAll(this);
  386. }
  387. int Stream::readToEnd(BufferedOutput& out, int32_t bufSize) {
  388. int r = 0;
  389. while (true) {
  390. if (out.bufferSize - out.bufferPos < bufSize) out.flushBuffer(bufSize);
  391. int i = read(out.buffer + out.bufferPos, out.bufferSize - out.bufferPos);
  392. if (i <= 0) return r;
  393. out.bufferPos += i;
  394. r += i;
  395. }
  396. }
  397. int Stream::readChunked(BufferedOutput& out, int32_t len, int32_t bufSize) {
  398. int r = 0;
  399. while (true) {
  400. int x = (len - r) > bufSize ? bufSize : (len - r);
  401. if (x <= 0) return r;
  402. if (out.bufferSize - out.bufferPos < x) out.flushBuffer(x);
  403. int i = read(out.buffer + out.bufferPos, x);
  404. if (i <= 0) return r;
  405. out.bufferPos += i;
  406. r += i;
  407. }
  408. }
  409. void Stream::readToEnd(BufferedOutput& out, const Callback& cb, int32_t bufSize) {
  410. _readToEnd.cb = cb;
  411. _readToEnd.bufSize = bufSize;
  412. _readToEnd.out = &out;
  413. _readToEnd.br = 0;
  414. Stream_beginRead(this);
  415. }
  416. void Stream::readChunked(BufferedOutput& out, int32_t len, const Callback& cb, int32_t bufSize) {
  417. _readChunked.cb = cb;
  418. _readChunked.bufSize = bufSize;
  419. _readChunked.out = &out;
  420. _readChunked.len = len;
  421. _readChunked.br = 0;
  422. Stream_beginRead1(this);
  423. }
  424. BufferedOutput* Stream::getBufferedOutput() {
  425. return NULL;
  426. }
  427. void Stream::readvAll(iovec* iov, int iovcnt, const Callback& cb) {
  428. _readvAll= {cb,iov,iovcnt,0,0};
  429. Stream_beginReadv(this);
  430. }
  431. void Stream::readAll(void* buf, int32_t len, const Callback& cb) {
  432. _readAll= {cb,(uint8_t*)buf,len,0};
  433. Stream_beginReadAll(this);
  434. }
  435. void Stream::writevAll(iovec* iov, int iovcnt, const Callback& cb) {
  436. _writevAll= {cb,iov,iovcnt,0,0};
  437. Stream_beginWritev(this);
  438. }
  439. void Stream::writeAll(const void* buf, int32_t len, const Callback& cb) {
  440. _writeAll= {cb,(const uint8_t*)buf,len,0};
  441. Stream_beginWriteAll(this);
  442. }
  443. StreamWriter::StreamWriter(BufferedOutput& s) :
  444. outp(&s), buffer(&s), sb(*(StreamBuffer*) nullptr) {
  445. }
  446. StreamWriter::StreamWriter(Stream& s) :
  447. outp(&s), buffer(s.getBufferedOutput()),
  448. sb(buffer == NULL ? *(new (_sb) StreamBuffer(s)) : *(StreamBuffer*) nullptr) {
  449. if (buffer == NULL) buffer = &sb;
  450. }
  451. StreamWriter::StreamWriter(MemoryStream& s) :
  452. outp(&s), buffer(&s), sb(*(StreamBuffer*) nullptr) {
  453. }
  454. StreamWriter::StreamWriter(StringStream& s) :
  455. outp(&s), buffer(&s), sb(*(StreamBuffer*) nullptr) {
  456. }
  457. StreamWriter::~StreamWriter() {
  458. flush();
  459. if (buffer == &sb) sb.~StreamBuffer();
  460. }
  461. StreamBuffer::StreamBuffer() {
  462. this->buffer = NULL;
  463. }
  464. StreamBuffer::StreamBuffer(Stream& s, int bufsize) :
  465. BufferedOutput((uint8_t*) malloc(bufsize), 0, bufsize), output(s) {
  466. if (this->buffer == NULL) throw bad_alloc();
  467. }
  468. void StreamBuffer::flushBuffer(int minBufferAllocation) {
  469. if (bufferPos <= 0) return;
  470. if (minBufferAllocation > bufferSize) {
  471. int bs = bufferSize;
  472. do {
  473. bs *= 2;
  474. } while (minBufferAllocation > bs);
  475. void* newbuffer = realloc(buffer, bs);
  476. if (newbuffer == NULL) throw bad_alloc();
  477. buffer = (uint8_t*) newbuffer;
  478. bufferSize = bs;
  479. }
  480. output->write(buffer, bufferPos);
  481. bufferPos = 0;
  482. }
  483. StreamReader::StreamReader(Stream& input, int bufsize) :
  484. input(&input), _sr(malloc(bufsize), bufsize), deletionFlag(NULL), bufSize(bufsize),
  485. eof(false) {
  486. //sr = malloc(streamReader_getSize() + bufsize);
  487. if (_sr.buffer == NULL) throw bad_alloc();
  488. }
  489. StreamReader::~StreamReader() {
  490. if (deletionFlag != NULL) *deletionFlag = true;
  491. free(_sr.buffer);
  492. }
  493. void StreamReader_checkReading1(StreamReader* This) {
  494. //if (This->shouldRead) throw CPollException("StreamReader is already reading");
  495. }
  496. void StreamReader_checkReading(StreamReader* This) {
  497. //StreamReader_checkReading1(This);
  498. //This->shouldRead = true;
  499. }
  500. void StreamReader_prepareAsyncRead(StreamReader* This, const StreamReader::Callback& cb) {
  501. This->cb = cb;
  502. This->out_s = NULL;
  503. StreamReader_checkReading(This);
  504. }
  505. void StreamReader_prepareSyncRead(StreamReader* This) {
  506. This->cb = nullptr;
  507. This->out_s = NULL;
  508. This->tmp.clear();
  509. StreamReader_checkReading(This);
  510. }
  511. void StreamReader_prepareAsyncReadStream(StreamReader* This, Stream& s,
  512. const StreamReader::StreamCallback& cb) {
  513. This->cb_s = cb;
  514. This->out_s = &s;
  515. This->tmp_i = 0;
  516. StreamReader_checkReading(This);
  517. }
  518. void StreamReader_prepareSyncReadStream(StreamReader* This, Stream& s) {
  519. This->cb_s = nullptr;
  520. This->out_s = &s;
  521. This->tmp_i = 0;
  522. StreamReader_checkReading(This);
  523. }
  524. void StreamReader::readTo(char delim, const Callback& cb) {
  525. StreamReader_prepareAsyncRead(this, cb);
  526. _sr.readUntilChar(delim);
  527. _loop(true);
  528. }
  529. void StreamReader::readTo(const char* delim, int delimLen, const Callback& cb) {
  530. StreamReader_prepareAsyncRead(this, cb);
  531. _sr.readUntilString(delim, delimLen);
  532. _loop(true);
  533. }
  534. void StreamReader::readTo(string delim, const Callback& cb) {
  535. StreamReader_prepareAsyncRead(this, cb);
  536. tmp_delim = delim;
  537. _sr.readUntilString(tmp_delim.data(), tmp_delim.length());
  538. _loop(true);
  539. }
  540. void StreamReader::readLine(const Callback& cb) {
  541. readTo('\n', cb);
  542. }
  543. string StreamReader::readTo(char delim) {
  544. StreamReader_prepareSyncRead(this);
  545. _sr.readUntilChar(delim);
  546. _doSyncRead();
  547. return this->tmp;
  548. }
  549. string StreamReader::readTo(const char* delim, int delimLen) {
  550. StreamReader_prepareSyncRead(this);
  551. _sr.readUntilString(delim, delimLen);
  552. _doSyncRead();
  553. return this->tmp;
  554. }
  555. string StreamReader::readTo(string delim) {
  556. StreamReader_prepareSyncRead(this);
  557. _sr.readUntilString(delim.data(), delim.length());
  558. _doSyncRead();
  559. return this->tmp;
  560. }
  561. string StreamReader::readLine() {
  562. return readTo('\n');
  563. }
  564. int StreamReader::readTo(char delim, Stream& s) {
  565. StreamReader_prepareSyncReadStream(this, s);
  566. _sr.readUntilChar(delim);
  567. _doSyncRead();
  568. return this->tmp_i;
  569. }
  570. int StreamReader::readTo(const char* delim, int delimLen, Stream& s) {
  571. StreamReader_prepareSyncReadStream(this, s);
  572. _sr.readUntilString(delim, delimLen);
  573. _doSyncRead();
  574. return this->tmp_i;
  575. }
  576. int StreamReader::readTo(string delim, Stream& s) {
  577. StreamReader_prepareSyncReadStream(this, s);
  578. tmp_delim = delim;
  579. _sr.readUntilString(tmp_delim.data(), tmp_delim.length());
  580. _doSyncRead();
  581. return this->tmp_i;
  582. }
  583. int StreamReader::readLine(Stream& s) {
  584. return readTo('\n', s);
  585. }
  586. void StreamReader::readTo(char delim, Stream& s, const StreamCallback& cb) {
  587. StreamReader_prepareAsyncReadStream(this, s, cb);
  588. _sr.readUntilChar(delim);
  589. _loop(true);
  590. }
  591. void StreamReader::readTo(const char* delim, int delimLen, Stream& s, const StreamCallback& cb) {
  592. StreamReader_prepareAsyncReadStream(this, s, cb);
  593. _sr.readUntilString(delim, delimLen);
  594. _loop(true);
  595. }
  596. void StreamReader::readTo(string delim, Stream& s, const StreamCallback& cb) {
  597. StreamReader_prepareAsyncReadStream(this, s, cb);
  598. tmp_delim = delim;
  599. _sr.readUntilString(tmp_delim.data(), tmp_delim.length());
  600. _loop(true);
  601. }
  602. void StreamReader::readLine(Stream& s, const StreamCallback& cb) {
  603. readTo('\n', s, cb);
  604. }
  605. int32_t StreamReader::read(void* buf, int32_t len) {
  606. void* tmp;
  607. int l = readBuffer(tmp, len);
  608. if (l <= 0) {
  609. freeBuffer(tmp, l);
  610. return input->read(buf, len);
  611. }
  612. memcpy(buf, tmp, l);
  613. freeBuffer(tmp, l);
  614. return l;
  615. }
  616. void StreamReader::read(void* buf, int32_t len, const CP::Callback& cb, bool repeat) {
  617. void* tmp;
  618. int l = readBuffer(tmp, len);
  619. if (l <= 0) {
  620. freeBuffer(tmp, l);
  621. return input->read(buf, len, cb, repeat);
  622. }
  623. memcpy(buf, tmp, l);
  624. freeBuffer(tmp, l);
  625. cb(l);
  626. }
  627. void StreamReader::readAll(void* buf, int32_t len, const CP::Callback& cb) {
  628. void* tmp;
  629. int l = readBuffer(tmp, len);
  630. if (l < len) {
  631. freeBuffer(tmp, l);
  632. return input->readAll(((uint8_t*) buf) + l, len - l, cb);
  633. }
  634. memcpy(buf, tmp, l);
  635. cb(l);
  636. freeBuffer(tmp, l);
  637. }
  638. void StreamReader::close() {
  639. input->close();
  640. }
  641. void StreamReader::flush() {
  642. input->flush();
  643. }
  644. void StreamReader::close(const CP::Callback& cb) {
  645. input->close(cb);
  646. }
  647. void StreamReader::flush(const CP::Callback& cb) {
  648. input->flush(cb);
  649. }
  650. int32_t StreamReader::readBuffer(void*& buf, int32_t maxlen) {
  651. StreamReader_checkReading1(this);
  652. String tmp = _sr.getBufferData();
  653. if (tmp.len > maxlen) tmp.len = maxlen;
  654. if (tmp.len <= 0) return 0;
  655. buf = tmp.data();
  656. return tmp.len;
  657. }
  658. void StreamReader::freeBuffer(void* buf, int32_t len) {
  659. _sr.skip(len);
  660. }
  661. bool StreamReader::_loop(bool async) {
  662. newStreamReader::item it;
  663. bool delFlag = false;
  664. deletionFlag = &delFlag;
  665. while (_sr.process(it)) {
  666. if (out_s != NULL) {
  667. out_s->write(it.data.data(), it.data.length());
  668. tmp_i += it.data.length();
  669. } else tmp.append(it.data.data(), it.data.length());
  670. if (it.delimReached) {
  671. if (out_s == NULL) {
  672. if (cb == nullptr) goto skipRead;
  673. bool* delFlag = deletionFlag;
  674. cb(tmp);
  675. if (*delFlag) goto skipRead;
  676. tmp.clear();
  677. } else {
  678. if (cb_s == nullptr) goto skipRead;
  679. cb_s(tmp_i);
  680. }
  681. return false;
  682. }
  683. }
  684. if (async) {
  685. _beginRead();
  686. deletionFlag = NULL;
  687. return true;
  688. }
  689. skipRead: if (!delFlag) deletionFlag = NULL;
  690. return false;
  691. }
  692. void StreamReader::_beginRead() {
  693. String buf = _sr.beginPutData();
  694. if (buf.data() == NULL) return;
  695. input->read(buf.data(), buf.length(), CP::Callback(&StreamReader::_readCB, this));
  696. }
  697. void StreamReader::_doSyncRead() {
  698. while (_loop(false)) {
  699. String buf = _sr.beginPutData();
  700. //if (get < 1 > (buf) <= 0) return;
  701. int r = input->read(buf.data(), buf.length());
  702. if (r <= 0) {
  703. String tmp = _sr.getBufferData();
  704. if (out_s == NULL) {
  705. string tmps = tmp.toSTDString();
  706. eof = true;
  707. _sr.reset();
  708. cb(tmps);
  709. } else {
  710. out_s->write(tmp.data(), tmp.length());
  711. tmp_i += tmp.length();
  712. _sr.reset();
  713. cb_s(tmp_i);
  714. }
  715. return;
  716. } else {
  717. _sr.endPutData(r);
  718. }
  719. }
  720. }
  721. void StreamReader::_readCB(int i) {
  722. if (i <= 0) {
  723. String tmp = _sr.getBufferData();
  724. eof = true;
  725. if (out_s == NULL) {
  726. string tmps = tmp.toSTDString();
  727. _sr.reset();
  728. cb(tmps);
  729. } else {
  730. out_s->write(tmp.data(), tmp.length());
  731. tmp_i += tmp.length();
  732. _sr.reset();
  733. cb_s(tmp_i);
  734. }
  735. } else {
  736. _sr.endPutData(i);
  737. _loop(true);
  738. }
  739. }
  740. void StreamReader::cancelRead() {
  741. input->cancelRead();
  742. }
  743. void StreamReader::cancelWrite() {
  744. input->cancelWrite();
  745. }
  746. Handle::Handle() {
  747. deinit();
  748. }
  749. Handle::Handle(HANDLE handle) {
  750. init(handle);
  751. }
  752. void Handle::init(HANDLE handle) {
  753. this->handle = checkError(handle);
  754. }
  755. void Handle::deinit() {
  756. _supportsEPoll = true;
  757. handle = -1;
  758. }
  759. Events Handle::dispatchMultiple(Events events, Events confident, const EventData& evtd) {
  760. //cout << (int32_t)events << endl;
  761. Events ret = Events::none;
  762. for (int32_t i = 0; i < numEvents; i++) {
  763. Events e = indexToEvent(i);
  764. //cout << (int32_t)e << " " << (((event_t)e)&((event_t)events)) << endl;
  765. if ((((event_t) e) & ((event_t) events)) == (event_t) e) {
  766. if (dispatch(e, evtd, (confident & e) == e)) ret |= e;
  767. }
  768. }
  769. //cout << ret << endl;
  770. return ret;
  771. }
  772. Events Handle::wait(EventData& evtd) { //since this is single-file, poll() will be used.
  773. //Events events=Events::none;
  774. Events w = getEvents();
  775. pollfd pfd;
  776. pfd.fd = handle;
  777. pfd.events = eventsToPoll(w);
  778. if (pfd.events == 0) return Events::none;
  779. poll(&pfd, 1, -1);
  780. evtd.hungUp = (pfd.revents & POLLHUP);
  781. evtd.error = (pfd.revents & POLLERR);
  782. /*for(int32_t i=0;i<numEvents;i++) {
  783. Events e=indexToEvent(i);
  784. short p=eventToPoll(e);
  785. if(p&pfd.revents!=0) events=(Events)((event_t)events | (event_t)e);
  786. }*/
  787. return pollToEvents(pfd.revents);
  788. }
  789. Events Handle::waitAndDispatch() {
  790. EventData evtd;
  791. Events e = wait(evtd);
  792. if (e == Events::none) return e;
  793. return dispatchMultiple(e, e, evtd);
  794. }
  795. void Handle::loop() {
  796. try {
  797. while (waitAndDispatch() != Events::none)
  798. ;
  799. } catch (const AbortException& ex) {
  800. }
  801. }
  802. /*void Handle::close() {
  803. ::close(handle);
  804. }*/
  805. Handle::~Handle() {
  806. //if (onClose != nullptr) onClose();
  807. //::close(handle);
  808. }
  809. static inline bool isWouldBlock() {
  810. return errno == EWOULDBLOCK || errno == EAGAIN;
  811. }
  812. //File
  813. File::File() :
  814. deletionFlag(NULL), dispatching(false) {
  815. }
  816. File::File(HANDLE handle) :
  817. deletionFlag(NULL), dispatching(false) {
  818. init(handle);
  819. }
  820. void File::init(HANDLE handle) {
  821. Handle::init(handle);
  822. }
  823. Events File::_getEvents() {
  824. if (dispatching) return preDispatchEvents;
  825. Events e = Events::none;
  826. for (int32_t i = 0; i < numEvents; i++)
  827. if (eventData[i].state != EventHandlerData::States::invalid) (event_t&) e |=
  828. (event_t) indexToEvent(i);
  829. //cout << "_getEvents: " << (int32_t)e << endl;
  830. return e;
  831. }
  832. ///only accepts one event
  833. EventHandlerData* File::beginAddEvent(Events event) {
  834. int i = eventToIndex(event);
  835. EventHandlerData *ed = &eventData[i];
  836. if (ed->state != EventHandlerData::States::invalid) throw CPollException(
  837. "Already listening for the specified event on the specified file. "
  838. "For example, you may not read() and recv() on one socket at the same time.");
  839. eventData[i].opcb = nullptr;
  840. return ed;
  841. }
  842. void File::endAddEvent(Events event, bool repeat) {
  843. Events old_events = _getEvents();
  844. int i = eventToIndex(event);
  845. eventData[i].state =
  846. repeat ? (EventHandlerData::States::repeat) : (EventHandlerData::States::once);
  847. if (onEventsChange != nullptr && !dispatching) onEventsChange(*this, old_events);
  848. }
  849. void File::cancel(Events event) {
  850. Events old_events = _getEvents();
  851. eventData[eventToIndex(event)].state = EventHandlerData::States::invalid;
  852. if (onEventsChange != nullptr && !dispatching) onEventsChange(*this, old_events);
  853. }
  854. int32_t File::read(void* buf, int32_t len) {
  855. return ::read(handle, buf, len);
  856. }
  857. int32_t File::write(const void* buf, int32_t len) {
  858. return ::write(handle, buf, len);
  859. }
  860. /*int32_t File::writeAll(const void* buf, int32_t len) {
  861. int32_t bw = 0, bw1 = 0;
  862. while (bw < len && (bw1 = write(((char*) buf) + bw, len - bw)) > 0)
  863. bw += bw1;
  864. return (bw1 < 0 && bw <= 0) ? -1 : bw;
  865. }*/
  866. int32_t File::send(const void* buf, int32_t len, int32_t flags) {
  867. return ::send(handle, buf, len, flags);
  868. }
  869. int32_t File::sendAll(const void* buf, int32_t len, int32_t flags) {
  870. int32_t bw = 0, bw1 = 0;
  871. while (bw < len && (bw1 = send(((char*) buf) + bw, len - bw, flags)) > 0)
  872. bw += bw1;
  873. return (bw1 < 0 && bw <= 0) ? -1 : bw;
  874. }
  875. int32_t File::recv(void* buf, int32_t len, int32_t flags) {
  876. return ::recv(handle, buf, len, flags);
  877. }
  878. int32_t File::recvAll(void* buf, int32_t len, int32_t flags) {
  879. int32_t bw = 0, bw1 = 0;
  880. while (bw < len && (bw1 = recv(((char*) buf) + bw, len - bw, flags)) > 0)
  881. bw += bw1;
  882. return (bw1 < 0 && bw <= 0) ? -1 : bw;
  883. }
  884. Events File::checkEvents(Events events) {
  885. pollfd pfd;
  886. pfd.fd = handle;
  887. pfd.events = eventsToPoll(events);
  888. poll(&pfd, 1, 0);
  889. return pollToEvents(pfd.revents);
  890. }
  891. bool File::doOperation(Events event, EventHandlerData& ed, const EventData& evtd,
  892. EventHandlerData::States oldstate, bool confident) {
  893. Operations op = ed.op;
  894. int32_t r;
  895. redo: r = 0;
  896. if (unlikely(handle<0)) {
  897. r = -1;
  898. goto asdf;
  899. }
  900. switch (op) {
  901. case Operations::read:
  902. r = read(ed.misc.bufferIO.buf, ed.misc.bufferIO.len);
  903. break;
  904. case Operations::readv:
  905. r = readv(ed.misc.bufferIOv.iov, ed.misc.bufferIOv.iovcnt);
  906. break;
  907. case Operations::readAll:
  908. r = read(((char*) ed.misc.bufferIO.buf) + ed.misc.bufferIO.len_done,
  909. ed.misc.bufferIO.len - ed.misc.bufferIO.len_done);
  910. if (r <= 0) {
  911. if (r < 0 && isWouldBlock()) return false;
  912. ed.state = EventHandlerData::States::invalid;
  913. if (ed.cb != nullptr) ed.cb(
  914. ed.misc.bufferIO.len_done == 0 ? r : ed.misc.bufferIO.len_done);
  915. return true;
  916. }
  917. ed.misc.bufferIO.len_done += r;
  918. if (ed.misc.bufferIO.len_done >= ed.misc.bufferIO.len) {
  919. ed.state = EventHandlerData::States::invalid;
  920. if (ed.cb != nullptr) ed.cb(ed.misc.bufferIO.len_done);
  921. }
  922. return true;
  923. case Operations::write:
  924. r = write(ed.misc.bufferIO.buf, ed.misc.bufferIO.len);
  925. break;
  926. case Operations::writev:
  927. r = writev(ed.misc.bufferIOv.iov, ed.misc.bufferIOv.iovcnt);
  928. break;
  929. case Operations::writeAll:
  930. r = write(((char*) ed.misc.bufferIO.buf) + ed.misc.bufferIO.len_done,
  931. ed.misc.bufferIO.len - ed.misc.bufferIO.len_done);
  932. //cout << "wrote " << r << " bytes on fd " << handle << endl;
  933. if (r <= 0) {
  934. if (r < 0 && isWouldBlock()) return false;
  935. ed.state = EventHandlerData::States::invalid;
  936. if (ed.cb != nullptr) ed.cb(
  937. ed.misc.bufferIO.len_done == 0 ? r : ed.misc.bufferIO.len_done);
  938. return true;
  939. }
  940. ed.misc.bufferIO.len_done += r;
  941. //cout << "len_done = " << ed.misc.bufferIO.len_done
  942. // << " of " << ed.misc.bufferIO.len << endl;
  943. if (ed.misc.bufferIO.len_done >= ed.misc.bufferIO.len) {
  944. ed.state = EventHandlerData::States::invalid;
  945. if (ed.cb != nullptr) ed.cb(ed.misc.bufferIO.len_done);
  946. }
  947. return true;
  948. case Operations::recv:
  949. r = recv(ed.misc.bufferIO.buf, ed.misc.bufferIO.len, ed.misc.bufferIO.flags);
  950. break;
  951. case Operations::recvAll:
  952. r = recv(((char*) ed.misc.bufferIO.buf) + ed.misc.bufferIO.len_done,
  953. ed.misc.bufferIO.len - ed.misc.bufferIO.len_done, ed.misc.bufferIO.flags);
  954. if (r <= 0) {
  955. if (r < 0 && isWouldBlock()) return false;
  956. ed.state = EventHandlerData::States::invalid;
  957. if (ed.cb != nullptr) ed.cb(
  958. ed.misc.bufferIO.len_done == 0 ? r : ed.misc.bufferIO.len_done);
  959. return true;
  960. }
  961. ed.misc.bufferIO.len_done += r;
  962. if (ed.misc.bufferIO.len_done >= ed.misc.bufferIO.len) {
  963. ed.state = EventHandlerData::States::invalid;
  964. if (ed.cb != nullptr) ed.cb(ed.misc.bufferIO.len_done);
  965. }
  966. return true;
  967. case Operations::send:
  968. r = send(ed.misc.bufferIO.buf, ed.misc.bufferIO.len, ed.misc.bufferIO.flags);
  969. break;
  970. case Operations::sendAll:
  971. r = send(((char*) ed.misc.bufferIO.buf) + ed.misc.bufferIO.len_done,
  972. ed.misc.bufferIO.len - ed.misc.bufferIO.len_done, ed.misc.bufferIO.flags);
  973. if (r <= 0) {
  974. if (r < 0 && isWouldBlock()) return false;
  975. ed.state = EventHandlerData::States::invalid;
  976. if (ed.cb != nullptr) ed.cb(
  977. ed.misc.bufferIO.len_done == 0 ? -1 : ed.misc.bufferIO.len_done);
  978. return true;
  979. }
  980. ed.misc.bufferIO.len_done += r;
  981. if (ed.misc.bufferIO.len_done >= ed.misc.bufferIO.len) {
  982. ed.state = EventHandlerData::States::invalid;
  983. if (ed.cb != nullptr) ed.cb(ed.misc.bufferIO.len_done);
  984. }
  985. return true;
  986. case Operations::close:
  987. if (!confident && (checkEvents(event) & event) != event) return false;
  988. close();
  989. break;
  990. case Operations::none:
  991. if (!confident && (checkEvents(event) & event) != event) return false;
  992. if (evtd.error || evtd.hungUp) r = -1;
  993. break;
  994. default:
  995. break;
  996. }
  997. if (r < 0 && isWouldBlock()) return false;
  998. //micro-optimization: assume that the above syscalls will return -1 if there is
  999. //an error or hang-up condition
  1000. if ((r <= 0 && op != Operations::none) /*|| evtd.error || evtd.hungUp*/) {
  1001. //invalidate the current event listener
  1002. asdf: ed.state = EventHandlerData::States::invalid;
  1003. }
  1004. bool* del = deletionFlag;
  1005. if (ed.cb != nullptr) ed.cb(r);
  1006. if (*del) return true;
  1007. if (ed.state == EventHandlerData::States::repeat) {
  1008. confident = false;
  1009. goto redo;
  1010. }
  1011. return true;
  1012. }
  1013. bool File::dispatch(Events event, const EventData& evtd, bool confident, bool& deletionFlag) {
  1014. //cout << (int32_t)event << " dispatched" << endl;
  1015. EventHandlerData& ed = eventData[eventToIndex(event)];
  1016. if (ed.state == EventHandlerData::States::invalid) return true;
  1017. EventHandlerData::States oldstate = ed.state;
  1018. if (ed.state == EventHandlerData::States::once) ed.state = EventHandlerData::States::invalid;
  1019. dispatching = true;
  1020. try {
  1021. if (!doOperation(event, ed, evtd, oldstate, confident)) {
  1022. dispatching = false;
  1023. ed.state = oldstate;
  1024. return false;
  1025. }
  1026. } catch (const CancelException& ex) {
  1027. ed.state = EventHandlerData::States::invalid;
  1028. }
  1029. if (deletionFlag) return true;
  1030. dispatching = false;
  1031. return true;
  1032. }
  1033. Events File::dispatchMultiple(Events events, Events confident, const EventData& evtd) {
  1034. preDispatchEvents = _getEvents();
  1035. dispatching = true;
  1036. Events ret = Events::none;
  1037. bool d = false;
  1038. this->deletionFlag = &d;
  1039. for (int32_t i = 0; i < numEvents; i++) {
  1040. Events e = indexToEvent(i);
  1041. //cout << (int32_t)e << " " << (((event_t)e)&((event_t)events)) << endl;
  1042. if ((((event_t) e) & ((event_t) events)) == (event_t) e) {
  1043. EventHandlerData& ed = eventData[i];
  1044. if (ed.state == EventHandlerData::States::invalid) continue;
  1045. if (ed.opcb != nullptr) {
  1046. if (ed.opcb(e, ed, evtd, (confident & e) == e)) ret |= e;
  1047. if (d) break;
  1048. continue;
  1049. }
  1050. EventHandlerData::States oldstate = ed.state;
  1051. if (ed.state == EventHandlerData::States::once) ed.state =
  1052. EventHandlerData::States::invalid;
  1053. try {
  1054. if (doOperation(e, ed, evtd, oldstate, (confident & e) == e)) {
  1055. ret |= e;
  1056. if (d) break;
  1057. } else {
  1058. if (d) break;
  1059. ed.state = oldstate;
  1060. }
  1061. } catch (const CancelException& ex) {
  1062. if (d) break;
  1063. ed.state = EventHandlerData::States::invalid;
  1064. }
  1065. //if (dispatch(e, evtd, (confident & e) == e, d)) ret |= e;
  1066. }
  1067. }
  1068. if (d) return ret;
  1069. this->deletionFlag = NULL;
  1070. dispatching = false;
  1071. return ret;
  1072. }
  1073. void File::fillIOEventHandlerData(EventHandlerData* ed, void* buf, int32_t len,
  1074. const Callback& cb, Events e, Operations op) {
  1075. ed->cb = cb;
  1076. ed->misc.bufferIO.buf = buf;
  1077. ed->misc.bufferIO.len = len;
  1078. ed->op = op;
  1079. }
  1080. void File::fillIOEventHandlerData(EventHandlerData* ed, iovec* iov, int iovcnt,
  1081. const Callback& cb, Events e, Operations op) {
  1082. ed->cb = cb;
  1083. ed->misc.bufferIOv.iov = iov;
  1084. ed->misc.bufferIOv.iovcnt = iovcnt;
  1085. ed->op = op;
  1086. }
  1087. bool File_doRead(File* This, Events event, EventHandlerData& ed, const EventData& evtd,
  1088. bool confident) {
  1089. int r = ::read(This->handle, ed.misc.bufferIO.buf, ed.misc.bufferIO.len);
  1090. if (r < 0 && isWouldBlock()) return false;
  1091. if (ed.state == EventHandlerData::States::once || r <= 0) ed.state =
  1092. EventHandlerData::States::invalid;
  1093. ed.cb(r);
  1094. return true;
  1095. }
  1096. bool File_doWritev(File* This, Events event, EventHandlerData& ed, const EventData& evtd,
  1097. bool confident) {
  1098. int r = ::writev(This->handle, ed.misc.bufferIOv.iov, ed.misc.bufferIOv.iovcnt);
  1099. if (r < 0 && isWouldBlock()) return false;
  1100. if (ed.state == EventHandlerData::States::once || r <= 0) ed.state =
  1101. EventHandlerData::States::invalid;
  1102. ed.cb(r);
  1103. return true;
  1104. }
  1105. void File::read(void* buf, int32_t len, const Callback& cb, bool repeat) {
  1106. if (!_supportsEPoll) {
  1107. asdfg: int32_t r = read(buf, len);
  1108. cb(r);
  1109. if (repeat && r > 0) goto asdfg;
  1110. return;
  1111. }
  1112. static const Events e = Events::in;
  1113. EventHandlerData* ed = beginAddEvent(e);
  1114. fillIOEventHandlerData(ed, buf, len, cb, e, Operations::read);
  1115. ed->opcb= {&File_doRead,this};
  1116. endAddEvent(e, repeat);
  1117. }
  1118. void File::readAll(void* buf, int32_t len, const Callback& cb) {
  1119. if (!_supportsEPoll) {
  1120. int32_t r = Stream::readAll(buf, len);
  1121. cb(r);
  1122. return;
  1123. }
  1124. static const Events e = Events::in;
  1125. EventHandlerData* ed = beginAddEvent(e);
  1126. fillIOEventHandlerData(ed, (void*) buf, len, cb, e, Operations::readAll);
  1127. ed->misc.bufferIO.len_done = 0;
  1128. endAddEvent(e, true);
  1129. }
  1130. void File::write(const void* buf, int32_t len, const Callback& cb, bool repeat) {
  1131. if (!_supportsEPoll) {
  1132. asdfg: int32_t r = write(buf, len);
  1133. cb(r);
  1134. if (repeat && r > 0) goto asdfg;
  1135. return;
  1136. }
  1137. static const Events e = Events::out;
  1138. EventHandlerData* ed = beginAddEvent(e);
  1139. fillIOEventHandlerData(ed, (void*) buf, len, cb, e, Operations::write);
  1140. endAddEvent(e, repeat);
  1141. }
  1142. void File::writeAll(const void* buf, int32_t len, const Callback& cb) {
  1143. if (!_supportsEPoll) {
  1144. int32_t bw = 0, bw1 = 0;
  1145. while (bw < len && (bw1 = write(((char*) buf) + bw, len - bw)) > 0)
  1146. bw += bw1;
  1147. cb((bw1 < 0 && bw <= 0) ? -1 : bw);
  1148. return;
  1149. }
  1150. static const Events e = Events::out;
  1151. EventHandlerData* ed = beginAddEvent(e);
  1152. fillIOEventHandlerData(ed, (void*) buf, len, cb, e, Operations::writeAll);
  1153. ed->misc.bufferIO.len_done = 0;
  1154. endAddEvent(e, true);
  1155. }
  1156. void File::recv(void* buf, int32_t len, int32_t flags, const Callback& cb, bool repeat) {
  1157. static const Events e = Events::in;
  1158. EventHandlerData* ed = beginAddEvent(e);
  1159. fillIOEventHandlerData(ed, buf, len, cb, e, Operations::recv);
  1160. ed->misc.bufferIO.flags = flags;
  1161. endAddEvent(e, repeat);
  1162. }
  1163. void File::recvAll(void* buf, int32_t len, int32_t flags, const Callback& cb) {
  1164. if (!_supportsEPoll) {
  1165. int32_t r = recvAll(buf, len);
  1166. cb(r);
  1167. return;
  1168. }
  1169. static const Events e = Events::in;
  1170. EventHandlerData* ed = beginAddEvent(e);
  1171. fillIOEventHandlerData(ed, (void*) buf, len, cb, e, Operations::recvAll);
  1172. ed->misc.bufferIO.len_done = 0;
  1173. endAddEvent(e, true);
  1174. }
  1175. void File::send(const void* buf, int32_t len, int32_t flags, const Callback& cb, bool repeat) {
  1176. static const Events e = Events::out;
  1177. EventHandlerData* ed = beginAddEvent(e);
  1178. fillIOEventHandlerData(ed, (void*) buf, len, cb, e, Operations::send);
  1179. ed->misc.bufferIO.flags = flags;
  1180. endAddEvent(e, repeat);
  1181. }
  1182. void File::sendAll(const void* buf, int32_t len, int32_t flags, const Callback& cb) {
  1183. static const Events e = Events::out;
  1184. EventHandlerData* ed = beginAddEvent(e);
  1185. fillIOEventHandlerData(ed, (void*) buf, len, cb, e, Operations::sendAll);
  1186. ed->misc.bufferIO.len_done = 0;
  1187. ed->misc.bufferIO.flags = flags;
  1188. endAddEvent(e, true);
  1189. }
  1190. File::~File() {
  1191. if (deletionFlag != NULL) *deletionFlag = true;
  1192. if (handle < 0) return;
  1193. close();
  1194. }
  1195. void File::close() {
  1196. //if(handle<0)throw runtime_error("asdf");
  1197. if (onClose != nullptr) onClose(*this);
  1198. ::close(handle);
  1199. deinit();
  1200. }
  1201. void File::flush() {
  1202. }
  1203. void File::close(const Callback& cb) {
  1204. if (!_supportsEPoll) {
  1205. close();
  1206. cb(0);
  1207. return;
  1208. }
  1209. static const Events e = Events::out;
  1210. EventHandlerData* ed = beginAddEvent(e);
  1211. ed->cb = cb;
  1212. ed->op = Operations::close;
  1213. endAddEvent(e, true);
  1214. }
  1215. void File::flush(const Callback& cb) {
  1216. cb(0);
  1217. }
  1218. void File::cancelRead() {
  1219. cancel(Events::in);
  1220. }
  1221. void File::cancelWrite() {
  1222. cancel(Events::out);
  1223. }
  1224. void File::waitForEvent(Events event, const Callback& cb, bool repeat) {
  1225. EventHandlerData* ed = beginAddEvent(event);
  1226. ed->cb = cb;
  1227. ed->op = Operations::none;
  1228. endAddEvent(event, repeat);
  1229. }
  1230. int32_t File::readv(iovec* iov, int iovcnt) {
  1231. return ::readv(handle, iov, iovcnt);
  1232. }
  1233. int32_t File::writev(iovec* iov, int iovcnt) {
  1234. return ::writev(handle, iov, iovcnt);
  1235. }
  1236. void File::readv(iovec* iov, int iovcnt, const Callback& cb, bool repeat) {
  1237. if (!_supportsEPoll) {
  1238. asdfg: int32_t r = readv(iov, iovcnt);
  1239. cb(r);
  1240. if (repeat && r > 0) goto asdfg;
  1241. return;
  1242. }
  1243. static const Events e = Events::in;
  1244. EventHandlerData* ed = beginAddEvent(e);
  1245. fillIOEventHandlerData(ed, iov, iovcnt, cb, e, Operations::readv);
  1246. endAddEvent(e, repeat);
  1247. }
  1248. void File::writev(iovec* iov, int iovcnt, const Callback& cb, bool repeat) {
  1249. if (!_supportsEPoll) {
  1250. asdfg: int32_t r = writev(iov, iovcnt);
  1251. cb(r);
  1252. if (repeat && r > 0) goto asdfg;
  1253. return;
  1254. }
  1255. static const Events e = Events::out;
  1256. EventHandlerData* ed = beginAddEvent(e);
  1257. fillIOEventHandlerData(ed, iov, iovcnt, cb, e, Operations::writev);
  1258. ed->opcb= {&File_doWritev,this};
  1259. endAddEvent(e, repeat);
  1260. }
  1261. //Socket
  1262. Socket::Socket() :
  1263. addressFamily(AF_UNSPEC), type(0), protocol(0) {
  1264. }
  1265. Socket::Socket(HANDLE handle, int32_t d, int32_t t, int32_t p) {
  1266. init(handle, d, t, p);
  1267. }
  1268. Socket::Socket(int32_t d, int32_t t, int32_t p) {
  1269. init(d, t, p);
  1270. }
  1271. void Socket::init(HANDLE handle, int32_t d, int32_t t, int32_t p) {
  1272. File::init(handle);
  1273. addressFamily = d;
  1274. type = t;
  1275. protocol = p;
  1276. }
  1277. void Socket::init(int32_t d, int32_t t, int32_t p) {
  1278. File::init(socket(d, t | SOCK_CLOEXEC | SOCK_NONBLOCK, p));
  1279. addressFamily = d;
  1280. type = t;
  1281. protocol = p;
  1282. }
  1283. //the caller must release() or free() the returned object
  1284. EndPoint* Socket::getLocalEndPoint() {
  1285. EndPoint* ep = EndPoint::create(addressFamily);
  1286. socklen_t l = (socklen_t) (ep->getSockAddrSize());
  1287. char addr[l];
  1288. getsockname(handle, (struct sockaddr*) addr, &l);
  1289. ep->setSockAddr((struct sockaddr*) addr);
  1290. return ep;
  1291. }
  1292. //the caller must release() or free() the returned object
  1293. EndPoint* Socket::getRemoteEndPoint() {
  1294. EndPoint* ep = EndPoint::create(addressFamily);
  1295. socklen_t l = (socklen_t) (ep->getSockAddrSize());
  1296. char addr[l];
  1297. getpeername(handle, (struct sockaddr*) addr, &l);
  1298. ep->setSockAddr((struct sockaddr*) addr);
  1299. return ep;
  1300. }
  1301. bool Socket::doOperation(Events event, EventHandlerData& ed, const EventData& evtd,
  1302. EventHandlerData::States oldstate, bool confident) {
  1303. Operations op = ed.op;
  1304. int r;
  1305. redo: r = 0;
  1306. switch (op) {
  1307. case Operations::accept:
  1308. {
  1309. HANDLE h = acceptHandle();
  1310. if (h < 0) {
  1311. if (isWouldBlock()) return false;
  1312. ed.state = EventHandlerData::States::invalid;
  1313. }
  1314. ed.cb(h);
  1315. goto success;
  1316. }
  1317. case Operations::shutdown:
  1318. if (!confident && (checkEvents(event) & event) != event) return false;
  1319. ed.cb(shutdown(ed.misc.shutdown.how));
  1320. return true;
  1321. case Operations::connect:
  1322. if (evtd.error || evtd.hungUp) {
  1323. ed.state = EventHandlerData::States::invalid;
  1324. ed.cb(-1);
  1325. return true;
  1326. }
  1327. if (!confident && (checkEvents(event) & event) != event) return false;
  1328. ed.cb(0);
  1329. goto success;
  1330. case Operations::sendTo:
  1331. r = sendTo(ed.misc.bufferIO.buf, ed.misc.bufferIO.len, ed.misc.bufferIO.flags,
  1332. *ed.misc.bufferIO.const_ep);
  1333. break;
  1334. case Operations::recvFrom:
  1335. r = recvFrom(ed.misc.bufferIO.buf, ed.misc.bufferIO.len, ed.misc.bufferIO.flags,
  1336. *ed.misc.bufferIO.ep);
  1337. break;
  1338. default:
  1339. return File::doOperation(event, ed, evtd, oldstate, confident);
  1340. }
  1341. if (r < 0 && isWouldBlock()) return false;
  1342. if (r <= 0) {
  1343. ed.state = EventHandlerData::States::invalid;
  1344. }
  1345. if (ed.cb != nullptr) ed.cb(r);
  1346. success: if (oldstate == EventHandlerData::States::repeat) {
  1347. confident = false;
  1348. goto redo;
  1349. }
  1350. return true;
  1351. }
  1352. void Socket::bind(const sockaddr *addr, int32_t addr_size) {
  1353. if (handle == -1) init(addr->sa_family, SOCK_STREAM, 0);
  1354. int32_t tmp12345 = 1;
  1355. setsockopt(handle, SOL_SOCKET, SO_REUSEADDR, &tmp12345, sizeof(tmp12345));
  1356. if (::bind(handle, addr, addr_size) != 0) throw CPollException(errno);
  1357. }
  1358. void Socket::bind(const EndPoint &ep) {
  1359. int32_t size = ep.getSockAddrSize();
  1360. uint8_t tmp[size];
  1361. ep.getSockAddr((sockaddr*) tmp);
  1362. bind((sockaddr*) tmp, size);
  1363. }
  1364. void Socket::bind(const char* hostname, const char* port, int32_t family, int32_t socktype,
  1365. int32_t proto, int32_t flags, Callback initsock) {
  1366. //XXX
  1367. if (handle != -1) throw CPollException(
  1368. "Socket::bind(string, ...) creates a socket, but the socket is already initialized");
  1369. auto hosts = EndPoint::lookupHost(hostname, port, 0, socktype, proto);
  1370. unsigned int i;
  1371. for (i = 0; i < hosts.size(); i++) {
  1372. int _f = socket(hosts[i]->addressFamily, socktype | SOCK_CLOEXEC | SOCK_NONBLOCK, proto);
  1373. if (_f < 0) continue;
  1374. int32_t tmp12345 = 1;
  1375. setsockopt(_f, SOL_SOCKET, SO_REUSEADDR, &tmp12345, sizeof(tmp12345));
  1376. if (initsock != nullptr) initsock(_f);
  1377. int size = hosts[i]->getSockAddrSize();
  1378. uint8_t tmp[size];
  1379. hosts[i]->getSockAddr((sockaddr*) tmp);
  1380. if (::bind(_f, (sockaddr*) tmp, size) == 0) {
  1381. init(_f, hosts[i]->addressFamily, socktype, proto);
  1382. return;
  1383. } else {
  1384. ::close(_f);
  1385. continue;
  1386. }
  1387. }
  1388. throw CPollException("no bindable hosts were found; last error: " + string(strerror(errno)));
  1389. }
  1390. void Socket::listen(int32_t backlog) {
  1391. checkError(::listen(handle, backlog));
  1392. }
  1393. int32_t Socket::shutdown(int32_t how) {
  1394. return ::shutdown(handle, how);
  1395. }
  1396. void Socket::shutdown(int32_t how, const Callback& cb) {
  1397. static const Events e = Events::out;
  1398. EventHandlerData* ed = beginAddEvent(e);
  1399. ed->cb = cb;
  1400. ed->op = Operations::shutdown;
  1401. endAddEvent(e, false);
  1402. }
  1403. void __socket_init_if_not_already(Socket* s, int32_t af) {
  1404. if (s->handle < 0) s->init(af, SOCK_STREAM, 0);
  1405. }
  1406. void Socket::connect(const sockaddr *addr, int32_t addr_size) {
  1407. __socket_init_if_not_already(this, addr->sa_family);
  1408. retry: int32_t tmp = ::connect(handle, addr, addr_size);
  1409. if (tmp != 0 && errno != EINPROGRESS) {
  1410. if (errno == EINTR) goto retry;
  1411. throw CPollException(errno);
  1412. }
  1413. }
  1414. void Socket::connect(const EndPoint &ep) {
  1415. int32_t l = ep.getSockAddrSize();
  1416. char tmp[l];
  1417. ep.getSockAddr((sockaddr*) tmp);
  1418. connect((sockaddr*) tmp, l);
  1419. }
  1420. void Socket::connect(const char* hostname, const char* port, int32_t family, int32_t socktype,
  1421. int32_t proto, int32_t flags) {
  1422. //XXX
  1423. if (handle != -1) throw CPollException(
  1424. "Socket::connect(string, ...) creates a socket, but the socket is already initialized");
  1425. auto hosts = EndPoint::lookupHost(hostname, port, 0, socktype, proto);
  1426. unsigned int i;
  1427. for (i = 0; i < hosts.size(); i++) {
  1428. int _f = socket(hosts[i]->addressFamily, socktype | SOCK_CLOEXEC | SOCK_NONBLOCK, proto);
  1429. if (_f < 0) continue;
  1430. int size = hosts[i]->getSockAddrSize();
  1431. uint8_t tmp[size];
  1432. hosts[i]->getSockAddr((sockaddr*) tmp);
  1433. if (::connect(_f, (sockaddr*) tmp, size) == 0) {
  1434. init(_f, hosts[i]->addressFamily, socktype, proto);
  1435. break;
  1436. } else {
  1437. ::close(_f);
  1438. continue;
  1439. }
  1440. }
  1441. throw CPollException("no reachable hosts were found; last error: " + string(strerror(errno)));
  1442. }
  1443. //the caller must release() or free() the returned object;
  1444. //also this will NOT automatically add the new socket to this Poll instance
  1445. //because the user might want to handle the socket on a different thread
  1446. //which requires a different Poll instance
  1447. Socket* Socket::accept() {
  1448. Socket* sock = new Socket(acceptHandle(), addressFamily, type, protocol);
  1449. return sock;
  1450. }
  1451. HANDLE Socket::acceptHandle() {
  1452. HANDLE h = ::accept4(handle, NULL, NULL, SOCK_CLOEXEC | SOCK_NONBLOCK);
  1453. return h;
  1454. }
  1455. void Socket::connect(const sockaddr* addr, int32_t addr_size, const Callback& cb) {
  1456. __socket_init_if_not_already(this, addr->sa_family);
  1457. checkError(fcntl(handle, F_SETFL, checkError(fcntl(handle, F_GETFL, 0)) | O_NONBLOCK));
  1458. connect(addr, addr_size);
  1459. static const Events e = Events::out;
  1460. EventHandlerData* ed = beginAddEvent(e);
  1461. ed->cb = cb;
  1462. ed->op = Operations::connect;
  1463. endAddEvent(e, false);
  1464. }
  1465. void Socket::connect(const EndPoint& ep, const Callback& cb) {
  1466. __socket_init_if_not_already(this, ep.addressFamily);
  1467. checkError(fcntl(handle, F_SETFL, checkError(fcntl(handle, F_GETFL, 0)) | O_NONBLOCK));
  1468. connect(ep);
  1469. static const Events e = Events::out;
  1470. EventHandlerData* ed = beginAddEvent(e);
  1471. ed->cb = cb;
  1472. ed->op = Operations::connect;
  1473. endAddEvent(e, false);
  1474. }
  1475. void Socket_acceptStub(Socket* th, int32_t i) {
  1476. Socket* s = new Socket((HANDLE) i, th->addressFamily, th->type, th->protocol);
  1477. th->_acceptCB(s);
  1478. }
  1479. void Socket_acceptHandleStub(Socket* th, int32_t i) {
  1480. HANDLE h = i;
  1481. th->_acceptHandleCB(h);
  1482. }
  1483. //user must eventually release() or free() the received object
  1484. void Socket::accept(const Delegate<void(Socket*)>& cb, bool repeat) {
  1485. _acceptCB = cb;
  1486. static const Events e = Events::in;
  1487. EventHandlerData* ed = beginAddEvent(e);
  1488. ed->cb = Callback(&Socket_acceptStub, this);
  1489. ed->op = Operations::accept;
  1490. endAddEvent(e, repeat);
  1491. }
  1492. void Socket::acceptHandle(const Delegate<void(HANDLE)>& cb, bool repeat) {
  1493. _acceptHandleCB = cb;
  1494. static const Events e = Events::in;
  1495. EventHandlerData* ed = beginAddEvent(e);
  1496. ed->cb = Callback(&Socket_acceptHandleStub, this);
  1497. ed->op = Operations::accept;
  1498. endAddEvent(e, repeat);
  1499. }
  1500. int32_t Socket::recvFrom(void* buf, int32_t len, int32_t flags, EndPoint& ep) {
  1501. socklen_t size = ep.getSockAddrSize();
  1502. uint8_t addr[size];
  1503. //ep->GetSockAddr((sockaddr*)tmp);
  1504. int tmp = recvfrom(handle, buf, len, flags, (sockaddr*) addr, &size);
  1505. checkError(tmp);
  1506. ep.setSockAddr((sockaddr*) addr);
  1507. return tmp;
  1508. }
  1509. int32_t Socket::sendTo(const void* buf, int32_t len, int32_t flags, const EndPoint& ep) {
  1510. socklen_t size = ep.getSockAddrSize();
  1511. uint8_t addr[size];
  1512. ep.getSockAddr((sockaddr*) addr);
  1513. int tmp = sendto(handle, buf, len, flags, (sockaddr*) addr, size);
  1514. return checkError(tmp);
  1515. }
  1516. void Socket::recvFrom(void* buf, int32_t len, int32_t flags, EndPoint& ep, const Callback& cb,
  1517. bool repeat) {
  1518. static const Events e = Events::in;
  1519. EventHandlerData* ed = beginAddEvent(e);
  1520. fillIOEventHandlerData(ed, buf, len, cb, e, Operations::recvFrom);
  1521. ed->misc.bufferIO.flags = flags;
  1522. ed->misc.bufferIO.ep = &ep;
  1523. endAddEvent(e, repeat);
  1524. }
  1525. void Socket::sendTo(const void* buf, int32_t len, int32_t flags, const EndPoint& ep,
  1526. const Callback& cb, bool repeat) {
  1527. static const Events e = Events::out;
  1528. EventHandlerData* ed = beginAddEvent(e);
  1529. fillIOEventHandlerData(ed, (void*) buf, len, cb, e, Operations::sendTo);
  1530. ed->misc.bufferIO.flags = flags;
  1531. ed->misc.bufferIO.const_ep = &ep;
  1532. endAddEvent(e, repeat);
  1533. }
  1534. //SignalFD
  1535. int32_t SignalFD::MAX_EVENTS(4);
  1536. SignalFD::SignalFD(HANDLE handle, const sigset_t& mask) :
  1537. Handle(handle), mask(mask) {
  1538. }
  1539. SignalFD::SignalFD(const sigset_t& mask, int32_t flags) :
  1540. Handle(signalfd(-1, &mask, flags | SFD_CLOEXEC | SFD_NONBLOCK)), mask(mask) {
  1541. }
  1542. bool SignalFD::dispatch(Events event, const EventData& evtd, bool confident) {
  1543. Signal sig[MAX_EVENTS];
  1544. int32_t br = ::read(handle, sig, sizeof(sig));
  1545. if (br < 0 && isWouldBlock()) return false;
  1546. if (callback != nullptr) {
  1547. br /= sizeof(Signal);
  1548. for (int32_t i = 0; i < br; i++) {
  1549. callback(sig[i]);
  1550. }
  1551. }
  1552. return true;
  1553. }
  1554. Events SignalFD::getEvents() {
  1555. return Events::in;
  1556. }
  1557. //Timer
  1558. static void Timer_doinit(Timer* This) {
  1559. This->dispatching = false;
  1560. This->deletionFlag = NULL;
  1561. }
  1562. static void Timer_doSetInterval(Timer* This, struct timespec interval) {
  1563. This->interval = interval;
  1564. struct itimerspec tmp1;
  1565. tmp1.it_interval = interval;
  1566. tmp1.it_value = interval;
  1567. timerfd_settime(This->handle, 0, &tmp1, NULL);
  1568. }
  1569. static void Timer_doSetInterval(Timer* This, uint64_t interval_ms) {
  1570. This->interval.tv_sec = interval_ms / 1000;
  1571. This->interval.tv_nsec = (interval_ms % 1000) * 1000000;
  1572. struct itimerspec tmp1;
  1573. tmp1.it_interval = This->interval;
  1574. tmp1.it_value = This->interval;
  1575. timerfd_settime(This->handle, 0, &tmp1, NULL);
  1576. }
  1577. void Timer::setInterval(struct timespec interval) {
  1578. bool r;
  1579. if (!dispatching) r = running();
  1580. Timer_doSetInterval(this, interval);
  1581. if (!dispatching && running() != r) {
  1582. if (onEventsChange != nullptr) onEventsChange(*this, r ? Events::in : Events::none);
  1583. }
  1584. }
  1585. void Timer::setInterval(uint64_t interval_ms) {
  1586. bool r;
  1587. if (!dispatching) r = running();
  1588. Timer_doSetInterval(this, interval_ms);
  1589. if (!dispatching && running() != r) {
  1590. if (onEventsChange != nullptr) onEventsChange(*this, r ? Events::in : Events::none);
  1591. }
  1592. }
  1593. void Timer::init(HANDLE handle, struct timespec interval) {
  1594. Handle::init(handle);
  1595. Timer_doinit(this);
  1596. setInterval(interval);
  1597. }
  1598. void Timer::init(HANDLE handle, uint64_t interval_ms) {
  1599. Handle::init(handle);
  1600. Timer_doinit(this);
  1601. setInterval(interval_ms);
  1602. }
  1603. void Timer::init(struct timespec interval) {
  1604. Handle::init(timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK));
  1605. Timer_doinit(this);
  1606. setInterval(interval);
  1607. }
  1608. void Timer::init(uint64_t interval_ms) {
  1609. Handle::init(timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK));
  1610. Timer_doinit(this);
  1611. setInterval(interval_ms);
  1612. }
  1613. Timer::Timer(HANDLE handle, uint64_t interval_ms) {
  1614. this->interval= {0,0};
  1615. init(handle, interval_ms);
  1616. }
  1617. Timer::Timer(HANDLE handle, struct timespec interval) {
  1618. this->interval= {0,0};
  1619. init(handle, interval);
  1620. }
  1621. Timer::Timer(uint64_t interval_ms) {
  1622. this->interval= {0,0};
  1623. init(interval_ms);
  1624. }
  1625. Timer::Timer(struct timespec interval) {
  1626. this->interval= {0,0};
  1627. init(interval);
  1628. }
  1629. struct timespec Timer::getInterval() {
  1630. return interval;
  1631. }
  1632. bool Timer::running() {
  1633. return !(interval.tv_nsec == 0 && interval.tv_sec == 0);
  1634. }
  1635. void Timer::setCallback(const Callback& cb) {
  1636. this->cb = cb;
  1637. }
  1638. bool Timer::dispatch(Events event, const EventData& evtd, bool confident) {
  1639. if (event == Events::in) {
  1640. dispatching = true;
  1641. //bool r = running();
  1642. uint64_t tmp;
  1643. bool d(false);
  1644. this->deletionFlag = &d;
  1645. int i;
  1646. if ((i = read(handle, &tmp, sizeof(tmp))) >= (int) sizeof(tmp) && cb != nullptr) cb(
  1647. (int) tmp);
  1648. else if (i < 0 && isWouldBlock()) {
  1649. this->deletionFlag = NULL;
  1650. dispatching = false;
  1651. return false;
  1652. }
  1653. if (d) return true;
  1654. dispatching = false;
  1655. deletionFlag = NULL;
  1656. return true;
  1657. }
  1658. return true;
  1659. }
  1660. void Timer::init(HANDLE handle) {
  1661. Handle::init(handle);
  1662. struct itimerspec tmp;
  1663. timerfd_gettime(handle, &tmp);
  1664. interval = tmp.it_interval;
  1665. if (running() && onEventsChange != nullptr) onEventsChange(*this, Events::none);
  1666. }
  1667. Timer::Timer(HANDLE handle) {
  1668. init(handle);
  1669. }
  1670. void Timer::close() {
  1671. if (onClose != nullptr) onClose(*this);
  1672. ::close(handle);
  1673. handle = -1;
  1674. deinit();
  1675. }
  1676. Timer::~Timer() {
  1677. if (deletionFlag != NULL) *deletionFlag = true;
  1678. if (handle < 0) return;
  1679. close();
  1680. }
  1681. Events Timer::getEvents() {
  1682. return running() ? Events::in : Events::none;
  1683. }
  1684. //EventFD
  1685. EventFD::EventFD(HANDLE handle) :
  1686. File(handle) {
  1687. }
  1688. EventFD::EventFD(uint32_t initval, int32_t flags) :
  1689. File(eventfd(initval, flags | EFD_CLOEXEC | EFD_NONBLOCK)) {
  1690. }
  1691. bool EventFD::doOperation(Events event, EventHandlerData& ed, const EventData& evtd,
  1692. EventHandlerData::States oldstate, bool confident) {
  1693. int32_t r = 0;
  1694. switch (ed.op) {
  1695. case Operations::read:
  1696. r = eventfd_read(handle, &ed.misc.eventfd.evt);
  1697. break;
  1698. case Operations::write:
  1699. r = eventfd_write(handle, ed.misc.eventfd.evt);
  1700. break;
  1701. default:
  1702. break;
  1703. }
  1704. if (r < 0 && isWouldBlock()) return false;
  1705. ed.cb(r);
  1706. return true;
  1707. }
  1708. eventfd_t EventFD::getEvent() {
  1709. eventfd_t tmp;
  1710. if (eventfd_read(handle, &tmp) == 0) return tmp;
  1711. return -1;
  1712. }
  1713. void EventFD_getEventStub(EventFD* th, int i) {
  1714. th->cb((i < 0) ? -1 : (th->eventData[eventToIndex(Events::in)].misc.eventfd.evt));
  1715. }
  1716. void EventFD::getEvent(const Delegate<void(eventfd_t)>& cb, bool repeat) {
  1717. Events e = Events::in;
  1718. EventHandlerData* ed = beginAddEvent(e);
  1719. this->cb = cb;
  1720. ed->cb = Callback(&EventFD_getEventStub, this);
  1721. ed->op = Operations::read;
  1722. endAddEvent(e, repeat);
  1723. }
  1724. int32_t EventFD::sendEvent(eventfd_t evt) {
  1725. return eventfd_write(handle, evt);
  1726. }
  1727. void EventFD::sendEvent(eventfd_t evt, const Delegate<void(int32_t)>& cb) {
  1728. Events e = Events::out;
  1729. EventHandlerData* ed = beginAddEvent(e);
  1730. ed->cb = cb;
  1731. ed->misc.eventfd.evt = evt;
  1732. ed->op = Operations::write;
  1733. endAddEvent(e, false);
  1734. }
  1735. //EPoll
  1736. static inline void fillEPollEvents(Handle& h, epoll_event& evt, Events e) {
  1737. evt.events = eventsToEPoll(e);
  1738. evt.data.u64 = 0; //work around valgrind warning
  1739. evt.data.ptr = &h;
  1740. }
  1741. int32_t EPoll::MAX_EVENTS(32);
  1742. EPoll::EPoll(HANDLE handle) :
  1743. Handle(handle), curEvents(NULL), active(0), cur_handle(-1) {
  1744. disableSignals();
  1745. }
  1746. EPoll::EPoll() :
  1747. Handle(checkError(epoll_create1(EPOLL_CLOEXEC))), curEvents(NULL), active(0),
  1748. cur_handle(-1) {
  1749. disableSignals();
  1750. }
  1751. void EPoll_disableHandle(EPoll* This, Handle& h) {
  1752. Events new_e = h.getEvents();
  1753. h._supportsEPoll = false;
  1754. EventData evtd;
  1755. evtd.hungUp = evtd.error = false;
  1756. while (new_e != Events::none) {
  1757. h.dispatchMultiple(new_e, new_e, evtd);
  1758. new_e = h.getEvents();
  1759. }
  1760. }
  1761. static inline void EPoll_applyHandle(EPoll* This, Handle& h, Events old_e) {
  1762. if (!h._supportsEPoll) {
  1763. //if (debug) printf("_applyHandle: h=%i, h._supportsEPoll=false\n", h.handle);
  1764. return;
  1765. }
  1766. //if (unlikely(has_deleted) && tmp_deleted.find(&h) != tmp_deleted.end()) return;
  1767. Events new_e = h.getEvents();
  1768. //if (debug) printf("_applyHandle: h=%i, old_e=%i, new_e=%i\n", h.handle, old_e, new_e);
  1769. if (new_e == old_e) return;
  1770. epoll_event evt;
  1771. if (old_e == Events::none) {
  1772. fillEPollEvents(h, evt, new_e);
  1773. //cout << "added " << h.handle << endl;
  1774. int r = epoll_ctl(This->handle, EPOLL_CTL_ADD, h.handle, &evt);
  1775. if (r < 0 && errno == EPERM) {
  1776. EPoll_disableHandle(This, h);
  1777. return;
  1778. }
  1779. checkError(r);
  1780. This->active++;
  1781. } else if (new_e == Events::none) {
  1782. //cout << "deleted " << h.handle << endl;
  1783. //checkError(epoll_ctl(this->handle, EPOLL_CTL_DEL, h.handle, NULL));
  1784. //XXX: removed error checking to work around cURL bug
  1785. epoll_ctl(This->handle, EPOLL_CTL_DEL, h.handle, NULL);
  1786. if (likely(This->curEvents!=NULL)) for (int i = This->curIndex; i < This->curLength; i++) {
  1787. if (This->curEvents[i].data.ptr == (void*) &h) This->curEvents[i].data.ptr = NULL;
  1788. }
  1789. This->active--;
  1790. } else {
  1791. fillEPollEvents(h, evt, new_e);
  1792. //cout << "modified " << h.handle << endl;
  1793. //printf("epoll_ctl: old_e=%i new_e=%i\n", old_e, new_e);
  1794. checkError(epoll_ctl(This->handle, EPOLL_CTL_MOD, h.handle, &evt));
  1795. uint32_t ep_e = eventsToEPoll(new_e);
  1796. if (likely(This->curEvents!=NULL)) for (int i = This->curIndex; i < This->curLength; i++) {
  1797. if (This->curEvents[i].data.ptr == (void*) &h) {
  1798. This->curEvents[i].events &= ep_e;
  1799. if (This->curEvents[i].events == 0) This->curEvents[i].data.ptr = NULL;
  1800. }
  1801. }
  1802. }
  1803. }
  1804. static inline int32_t EPoll_doDispatch(EPoll* This, const epoll_event& event) {
  1805. Handle* h = (Handle*) event.data.ptr;
  1806. if (unlikely(h==NULL)) return 0;
  1807. EventData evtd;
  1808. event_t evt = (event_t) ePollToEvents(event.events);
  1809. evtd.hungUp = (event.events & EPOLLHUP);
  1810. evtd.error = (event.events & EPOLLERR);
  1811. This->cur_handle = h->handle;
  1812. Events old_e = h->getEvents();
  1813. This->cur_deleted = false;
  1814. This->cur_handle = h->handle;
  1815. h->dispatchMultiple((Events) evt, (Events) evt, evtd);
  1816. if (This->cur_deleted) goto aaa;
  1817. if (h->getEvents() != old_e) This->applyHandle(*h, old_e);
  1818. aaa: This->cur_handle = -1;
  1819. return 1;
  1820. }
  1821. int32_t EPoll::_doEPoll(int32_t timeout) {
  1822. if (active <= 0) {
  1823. //printf("active=%i\n", active);
  1824. return -1;
  1825. }
  1826. epoll_event evts[MAX_EVENTS];
  1827. retry: int32_t n = checkError(epoll_wait(handle, evts, MAX_EVENTS, timeout));
  1828. if (unlikely(n < 0)) {
  1829. goto retry;
  1830. }
  1831. curEvents = evts;
  1832. curLength = n;
  1833. for (curIndex = 0; curIndex < n; curIndex++)
  1834. EPoll_doDispatch(this, evts[curIndex]);
  1835. return n;
  1836. }
  1837. bool EPoll::dispatch(Events event, const EventData& evtd, bool confident) {
  1838. return _doEPoll(0) > 0;
  1839. }
  1840. Events EPoll::dispatchMultiple(Events event, Events confident, const EventData& evtd) {
  1841. //throw CPollException("EPoll::dispatch() not implemented");
  1842. return _doEPoll(0) <= 0 ? Events::none : Events::all;
  1843. }
  1844. Events EPoll::getEvents() {
  1845. //throw CPollException("EPoll::getEvents() not implemented");
  1846. return active ? (Events::all) : (Events::none);
  1847. }
  1848. Events EPoll::waitAndDispatch() {
  1849. return _doEPoll(-1) <= 0 ? Events::none : Events::all;
  1850. }
  1851. void EPoll::applyHandle(Handle& h, Events old_e) {
  1852. //cout << "applyHandle" << endl;
  1853. //if (h.handle == cur_handle) return;
  1854. EPoll_applyHandle(this, h, old_e);
  1855. }
  1856. void EPoll::add(Handle& h) {
  1857. //h.retain();
  1858. h.onEventsChange = Delegate<void(Handle&, Events)>(&EPoll::applyHandle, this);
  1859. //h.onEventsChange = [this,&h](Events old_events) {this->applyHandle(h,old_events);};
  1860. EPoll_applyHandle(this, h, Events::none);
  1861. h.onClose = Delegate<void(Handle& h)>(&EPoll::del, this);
  1862. }
  1863. void EPoll::del(Handle& h) {
  1864. //h.release();
  1865. //tmp_deleted.push_back(&h);
  1866. //throw 0;
  1867. //printf("EPoll::del()\n");
  1868. if (h.handle == cur_handle) cur_deleted = true;
  1869. if (h.getEvents() != Events::none) {
  1870. /*if (h.handle < 0) {
  1871. //throw runtime_error("test");
  1872. Events new_e = h.getEvents();
  1873. EventData evtd;
  1874. evtd.hungUp = evtd.error = true;
  1875. while (new_e != Events::none) {
  1876. h.dispatchMultiple(new_e, evtd);
  1877. new_e = h.getEvents();
  1878. }
  1879. }*/
  1880. //printf("EPoll::del()\n");
  1881. //if we're in the middle of a _doEPoll() loop, disable all pending events in queue
  1882. //relating to this handle since it might not even exist anymore after this function
  1883. //returns
  1884. if (likely(curEvents!=NULL)) for (int i = curIndex; i < curLength; i++) {
  1885. if (curEvents[i].data.ptr == (void*) &h) curEvents[i].data.ptr = NULL;
  1886. }
  1887. if (h.handle >= 0) {
  1888. //checkError(epoll_ctl(this->handle, EPOLL_CTL_DEL, h.handle, (epoll_event*) 1));
  1889. //XXX: see previous comment about EPOLL_CTL_DEL
  1890. epoll_ctl(this->handle, EPOLL_CTL_DEL, h.handle, (epoll_event*) 1);
  1891. active--;
  1892. }
  1893. }
  1894. h.onEventsChange = nullptr;
  1895. h.onClose = nullptr;
  1896. }
  1897. //NewEPoll
  1898. int32_t NewEPoll::MAX_EVENTS(32);
  1899. static bool compareDrainInfo(const NewEPoll::drainInfo& a, const NewEPoll::drainInfo& b) {
  1900. return a.h < b.h;
  1901. }
  1902. NewEPoll::NewEPoll(HANDLE h) :
  1903. Handle(h), _draining(NULL), _dispatchingHandle(NULL), _curEvents(NULL) {
  1904. disableSignals();
  1905. }
  1906. NewEPoll::NewEPoll() :
  1907. Handle(checkError(epoll_create1(EPOLL_CLOEXEC))), _draining(NULL),
  1908. _dispatchingHandle(NULL), _curEvents(NULL) {
  1909. disableSignals();
  1910. }
  1911. bool NewEPoll::dispatch(Events event, const EventData& evtd, bool confident) {
  1912. return _doIteration(0);
  1913. }
  1914. Events NewEPoll::dispatchMultiple(Events event, Events confident, const EventData& evtd) {
  1915. return _doIteration(0) ? event : Events::none;
  1916. }
  1917. Events NewEPoll::getEvents() {
  1918. return Events::all;
  1919. }
  1920. Events NewEPoll::waitAndDispatch() {
  1921. return _doIteration(-1) ? Events::all : Events::none;
  1922. }
  1923. void NewEPoll::add(Handle& h) {
  1924. epoll_event evt;
  1925. fillEPollEvents(h, evt, Events::all);
  1926. evt.events |= EPOLLET;
  1927. int r = epoll_ctl(this->handle, EPOLL_CTL_ADD, h.handle, &evt);
  1928. if (r < 0 && errno == EPERM) {
  1929. h._supportsEPoll = false;
  1930. return;
  1931. }
  1932. h.onEventsChange = Delegate<void(Handle&, Events)>(&NewEPoll::_applyHandle, this);
  1933. _queueHandle(h, h.getEvents());
  1934. h.onClose = Delegate<void(Handle& h)>(&NewEPoll::del, this);
  1935. }
  1936. void NewEPoll::del(Handle& h) {
  1937. if (&h == _dispatchingHandle) _dispatchingDeleted = true;
  1938. if (likely(_curEvents!=NULL)) for (int i = _curIndex; i < _curLength; i++) {
  1939. if (_curEvents[i].data.ptr == (void*) &h) _curEvents[i].data.ptr = NULL;
  1940. }
  1941. for (uint32_t i = 0; i < _pending.size(); i++)
  1942. if (_pending[i].h == &h) _pending[i].h = NULL;
  1943. if (likely(_draining!=NULL)) for (uint32_t i = 0; i < _draining->size(); i++)
  1944. if ((*_draining)[i].h == &h) (*_draining)[i].h = NULL;
  1945. epoll_ctl(this->handle, EPOLL_CTL_DEL, h.handle, (epoll_event*) 1);
  1946. h.onEventsChange = nullptr;
  1947. h.onClose = nullptr;
  1948. }
  1949. bool NewEPoll::_doIteration(int timeout) {
  1950. bool ret = false;
  1951. while (_pending.size() > 0) {
  1952. vector<drainInfo> tmpevents1 = _pending;
  1953. _draining = &tmpevents1;
  1954. _pending.clear();
  1955. std::sort(tmpevents1.begin(), tmpevents1.end(), compareDrainInfo);
  1956. Handle* last_h = NULL;
  1957. Events last_e = Events::none;
  1958. for (int i = 0; i < (int) tmpevents1.size(); i++) {
  1959. if (tmpevents1[i].h == NULL) continue;
  1960. ret = true;
  1961. if (last_h == tmpevents1[i].h) {
  1962. last_e = last_e | tmpevents1[i].new_e;
  1963. continue;
  1964. }
  1965. if (last_h != NULL) _drainHandle(*last_h, last_e);
  1966. last_h = tmpevents1[i].h;
  1967. last_e = tmpevents1[i].new_e;
  1968. }
  1969. if (last_h != NULL) _drainHandle(*last_h, last_e);
  1970. _draining = NULL;
  1971. }
  1972. epoll_event evts[MAX_EVENTS];
  1973. retry: int32_t n = checkError(epoll_wait(handle, evts, MAX_EVENTS, timeout));
  1974. if (unlikely(n < 0)) {
  1975. goto retry;
  1976. }
  1977. if (n > 0) ret = true;
  1978. _curEvents = evts;
  1979. _curLength = n;
  1980. for (_curIndex = 0; _curIndex < n; _curIndex++)
  1981. _doDispatch(evts[_curIndex]);
  1982. return ret;
  1983. }
  1984. void NewEPoll::_doDispatch(const epoll_event& event) {
  1985. Handle* h = (Handle*) event.data.ptr;
  1986. if (unlikely(h==NULL)) return;
  1987. _dispatchingHandle = h;
  1988. _dispatchingDeleted = false;
  1989. EventData evtd;
  1990. event_t evt = (event_t) ePollToEvents(event.events);
  1991. evt = evt & (event_t) h->getEvents();
  1992. evtd.hungUp = (event.events & EPOLLHUP);
  1993. evtd.error = (event.events & EPOLLERR);
  1994. Events events = h->dispatchMultiple((Events) evt, (Events) evt, evtd);
  1995. if (_dispatchingDeleted) goto aaa;
  1996. event_t failed;
  1997. failed = 0;
  1998. while (true) {
  1999. events = Events((event_t) h->getEvents() & ~failed);
  2000. if (events == Events::none) break;
  2001. event_t res = (event_t) h->dispatchMultiple(events, Events::none, evtd);
  2002. failed |= event_t(events) & ~res;
  2003. if (_dispatchingDeleted) goto aaa;
  2004. }
  2005. //_applyHandle(*h, old_e);
  2006. aaa: _dispatchingHandle = NULL;
  2007. }
  2008. void NewEPoll::_drainHandle(Handle& h, Events new_e) {
  2009. if (new_e != Events::none) {
  2010. EventData evtd;
  2011. evtd.hungUp = evtd.error = false;
  2012. _dispatchingDeleted = false;
  2013. _dispatchingHandle = &h;
  2014. event_t failed;
  2015. failed = 0;
  2016. while (true) {
  2017. Events events = Events((event_t) h.getEvents() & ~failed);
  2018. if (events == Events::none) break;
  2019. event_t res = (event_t) h.dispatchMultiple(events, Events::none, evtd);
  2020. failed |= event_t(events) & ~res;
  2021. if (_dispatchingDeleted) goto out;
  2022. }
  2023. }
  2024. out: _dispatchingHandle = NULL;
  2025. }
  2026. void NewEPoll::_queueHandle(Handle& h, Events new_e) {
  2027. _pending.push_back( { &h, new_e });
  2028. }
  2029. void NewEPoll::_applyHandle(Handle& h, Events old_e) {
  2030. Events new_e = h.getEvents();
  2031. Events new_added = (old_e ^ new_e) & new_e;
  2032. if (new_added != Events::none) _queueHandle(h, new_added);
  2033. }
  2034. StandardStream::StandardStream() :
  2035. in(0), out(1) {
  2036. }
  2037. int32_t StandardStream::read(void* buf, int32_t len) {
  2038. return in.read(buf, len);
  2039. }
  2040. int32_t StandardStream::readAll(void* buf, int32_t len) {
  2041. return in.readAll(buf, len);
  2042. }
  2043. int32_t StandardStream::write(const void* buf, int32_t len) {
  2044. return out.write(buf, len);
  2045. }
  2046. int32_t StandardStream::writeAll(const void* buf, int32_t len) {
  2047. return out.writeAll(buf, len);
  2048. }
  2049. void StandardStream::read(void* buf, int32_t len, const Callback& cb, bool repeat) {
  2050. in.read(buf, len, cb, repeat);
  2051. }
  2052. void StandardStream::readAll(void* buf, int32_t len, const Callback& cb) {
  2053. in.readAll(buf, len, cb);
  2054. }
  2055. void StandardStream::write(const void* buf, int32_t len, const Callback& cb, bool repeat) {
  2056. out.write(buf, len, cb, repeat);
  2057. }
  2058. void StandardStream::writeAll(const void* buf, int32_t len, const Callback& cb) {
  2059. out.writeAll(buf, len, cb);
  2060. }
  2061. void StandardStream::cancelRead() {
  2062. in.cancelRead();
  2063. }
  2064. void StandardStream::cancelWrite() {
  2065. out.cancelWrite();
  2066. }
  2067. void StandardStream::close() {
  2068. }
  2069. void StandardStream::flush() {
  2070. out.flush();
  2071. }
  2072. void StandardStream::close(const Callback& cb) {
  2073. cb(0);
  2074. }
  2075. void StandardStream::flush(const Callback& cb) {
  2076. out.flush(cb);
  2077. }
  2078. FixedMemoryStream::FixedMemoryStream() :
  2079. BufferedOutput(NULL, 0, 0), len(0) {
  2080. }
  2081. FixedMemoryStream::FixedMemoryStream(void* data, int len) :
  2082. BufferedOutput((uint8_t*) data, 0, len), len(0) {
  2083. }
  2084. int32_t FixedMemoryStream::read(void* buf, int32_t len) {
  2085. int l = len < (this->len - this->bufferPos) ? len : (this->len - this->bufferPos);
  2086. if (l <= 0) return 0;
  2087. memcpy(buf, this->buffer + this->bufferPos, l);
  2088. this->bufferPos += l;
  2089. return l;
  2090. }
  2091. int32_t FixedMemoryStream::readAll(void* buf, int32_t len) {
  2092. return read(buf, len);
  2093. }
  2094. int32_t FixedMemoryStream::write(const void* buf, int32_t len) {
  2095. int l = len < (this->len - this->bufferPos) ? len : (this->len - this->bufferPos);
  2096. if (l <= 0) return 0;
  2097. memcpy(this->buffer + this->bufferPos, buf, l);
  2098. this->bufferPos += l;
  2099. return l;
  2100. }
  2101. int32_t FixedMemoryStream::writeAll(const void* buf, int32_t len) {
  2102. if (this->bufferPos + len > this->len) return -1;
  2103. return write(buf, len);
  2104. }
  2105. void FixedMemoryStream::read(void* buf, int32_t len, const Callback& cb, bool repeat) {
  2106. rep: int tmp = read(buf, len);
  2107. cb(tmp);
  2108. if (repeat && tmp > 0) goto rep;
  2109. }
  2110. void FixedMemoryStream::readAll(void* buf, int32_t len, const Callback& cb) {
  2111. int tmp = readAll(buf, len);
  2112. cb(tmp);
  2113. }
  2114. void FixedMemoryStream::write(const void* buf, int32_t len, const Callback& cb, bool repeat) {
  2115. rep: int tmp = write(buf, len);
  2116. cb(tmp);
  2117. if (repeat && tmp > 0) goto rep;
  2118. }
  2119. void FixedMemoryStream::writeAll(const void* buf, int32_t len, const Callback& cb) {
  2120. int tmp = writeAll(buf, len);
  2121. cb(tmp);
  2122. }
  2123. void FixedMemoryStream::cancelRead() {
  2124. }
  2125. void FixedMemoryStream::cancelWrite() {
  2126. }
  2127. void FixedMemoryStream::close() {
  2128. }
  2129. void FixedMemoryStream::flush() {
  2130. }
  2131. void FixedMemoryStream::close(const Callback& cb) {
  2132. cb(0);
  2133. }
  2134. void FixedMemoryStream::flush(const Callback& cb) {
  2135. cb(0);
  2136. }
  2137. int32_t FixedMemoryStream::readBuffer(void*& buf, int32_t maxlen) {
  2138. int l;
  2139. l = this->len - this->bufferPos;
  2140. if (maxlen >= 0 && maxlen < l) l = maxlen;
  2141. if (l <= 0) return 0;
  2142. buf = this->buffer + this->bufferPos;
  2143. this->bufferPos += l;
  2144. return l;
  2145. }
  2146. void FixedMemoryStream::flushBuffer(int minBufferAllocation) {
  2147. if (minBufferAllocation > this->len - this->bufferPos) throw runtime_error(
  2148. "overflowed FixedMemoryStream");
  2149. }
  2150. BufferedOutput* FixedMemoryStream::getBufferedOutput() {
  2151. return this;
  2152. }
  2153. MemoryStream::MemoryStream(int capacity) :
  2154. FixedMemoryStream(malloc(capacity), 0) {
  2155. if (buffer == NULL) throw bad_alloc();
  2156. bufferSize = capacity;
  2157. }
  2158. MemoryStream::~MemoryStream() {
  2159. if (buffer != NULL) free(buffer);
  2160. }
  2161. void MemoryStream::ensureCapacity(int c) {
  2162. if (buffer == NULL) throw runtime_error("attempted to write to closed MemoryStream");
  2163. if (likely(c<=bufferSize)) return;
  2164. int tmp = bufferSize;
  2165. if (tmp <= 0) tmp = 4096;
  2166. while (tmp < c)
  2167. tmp *= 2;
  2168. void* v = realloc(buffer, tmp);
  2169. if (v == NULL) throw bad_alloc();
  2170. buffer = (uint8_t*) v;
  2171. bufferSize = tmp;
  2172. }
  2173. int32_t MemoryStream::write(const void* buf, int32_t len) {
  2174. ensureCapacity(this->bufferPos + len);
  2175. if (this->bufferPos + len > this->len) this->len = this->bufferPos + len;
  2176. return FixedMemoryStream::write(buf, len);
  2177. }
  2178. int32_t MemoryStream::writeAll(const void* buf, int32_t len) {
  2179. /*ensureCapacity(this->bufferSize + len);
  2180. this->bufferPos += len;
  2181. if (this->bufferPos > this->len) this->len = this->bufferPos;
  2182. return FixedMemoryStream::writeAll(buf, len);*/
  2183. return write(buf, len);
  2184. }
  2185. void MemoryStream::close() {
  2186. if (buffer == NULL) return;
  2187. free(buffer);
  2188. buffer = NULL;
  2189. bufferSize = len = 0;
  2190. }
  2191. void MemoryStream::clear() {
  2192. len = 0;
  2193. bufferPos = 0;
  2194. }
  2195. void MemoryStream::flushBuffer(int minBufferAllocation) {
  2196. if (this->bufferPos > this->len) this->len = this->bufferPos;
  2197. ensureCapacity(this->len + minBufferAllocation);
  2198. }
  2199. void MemoryStream::keepBuffer() {
  2200. buffer = NULL;
  2201. }
  2202. StringPool::StringPool(int pageSize) :
  2203. _firstPage(NULL), _curPage(NULL), _firstRawItem(NULL), _curRawItem(NULL),
  2204. _pageSize(pageSize) {
  2205. }
  2206. StringPool::~StringPool() {
  2207. clear();
  2208. if (_firstPage != NULL) {
  2209. ::free(_firstPage);
  2210. }
  2211. }
  2212. void StringPool::clear() {
  2213. _pageHeader* h;
  2214. if (_firstPage != NULL) {
  2215. h = _firstPage->next;
  2216. _firstPage->next = NULL;
  2217. while (h != NULL) {
  2218. _pageHeader* n = h->next;
  2219. ::free(h);
  2220. h = n;
  2221. }
  2222. }
  2223. h = _firstRawItem;
  2224. while (h != NULL) {
  2225. _pageHeader* n = h->next;
  2226. ::free(h);
  2227. h = n;
  2228. }
  2229. _curPage = _firstPage;
  2230. _curIndex = 0;
  2231. _firstRawItem = _curRawItem = NULL;
  2232. }
  2233. void StringPool::_addPage() {
  2234. void* tmp = malloc(_pageSize);
  2235. if (tmp == NULL) throw bad_alloc();
  2236. if (_curPage != NULL) _curPage->next = (_pageHeader*) tmp;
  2237. _curPage = (_pageHeader*) tmp;
  2238. _curPage->next = NULL;
  2239. if (_firstPage == NULL) _firstPage = (_pageHeader*) tmp;
  2240. _curIndex = 0;
  2241. }
  2242. void StringPool::_addRaw(int len) {
  2243. void* tmp = malloc(len + sizeof(_pageHeader));
  2244. if (tmp == NULL) throw bad_alloc();
  2245. if (_curRawItem != NULL) _curRawItem->next = (_pageHeader*) tmp;
  2246. _curRawItem = (_pageHeader*) tmp;
  2247. _curRawItem->next = NULL;
  2248. if (_firstRawItem == NULL) _firstRawItem = (_pageHeader*) tmp;
  2249. }
  2250. StringStream::StringStream() :
  2251. BufferedOutput(NULL, 0, 0), len(0) {
  2252. }
  2253. int32_t StringStream::read(void* buf, int32_t len) {
  2254. int l = len < (this->len - this->bufferPos) ? len : (this->len - this->bufferPos);
  2255. if (l <= 0) return 0;
  2256. memcpy(buf, buffer + this->bufferPos, l);
  2257. this->bufferPos += l;
  2258. return l;
  2259. }
  2260. int32_t StringStream::readAll(void* buf, int32_t len) {
  2261. return read(buf, len);
  2262. }
  2263. int32_t StringStream::write(const void* buf, int32_t len) {
  2264. if (bufferPos + len > this->len) {
  2265. _str.reserve(bufferPos + len);
  2266. _str.resize(_str.capacity());
  2267. this->len = bufferPos + len;
  2268. this->buffer = (uint8_t*) _str.data();
  2269. }
  2270. memcpy(buffer + this->bufferPos, buf, len);
  2271. this->bufferPos += len;
  2272. return len;
  2273. }
  2274. int32_t StringStream::writeAll(const void* buf, int32_t len) {
  2275. return write(buf, len);
  2276. }
  2277. void StringStream::read(void* buf, int32_t len, const Callback& cb, bool repeat) {
  2278. rep: int tmp = read(buf, len);
  2279. cb(tmp);
  2280. if (repeat && tmp > 0) goto rep;
  2281. }
  2282. void StringStream::readAll(void* buf, int32_t len, const Callback& cb) {
  2283. int tmp = readAll(buf, len);
  2284. cb(tmp);
  2285. }
  2286. void StringStream::write(const void* buf, int32_t len, const Callback& cb, bool repeat) {
  2287. rep: int tmp = write(buf, len);
  2288. cb(tmp);
  2289. if (repeat && tmp > 0) goto rep;
  2290. }
  2291. void StringStream::writeAll(const void* buf, int32_t len, const Callback& cb) {
  2292. int tmp = writeAll(buf, len);
  2293. cb(tmp);
  2294. }
  2295. void StringStream::cancelRead() {
  2296. }
  2297. void StringStream::cancelWrite() {
  2298. }
  2299. void StringStream::close() {
  2300. }
  2301. void StringStream::flush() {
  2302. }
  2303. void StringStream::close(const Callback& cb) {
  2304. cb(0);
  2305. }
  2306. void StringStream::flush(const Callback& cb) {
  2307. cb(0);
  2308. }
  2309. int32_t StringStream::readBuffer(void*& buf, int32_t maxlen) {
  2310. int l;
  2311. l = this->len - this->bufferPos;
  2312. if (maxlen >= 0 && maxlen < l) l = maxlen;
  2313. if (l <= 0) return 0;
  2314. buf = this->buffer + this->bufferPos;
  2315. this->bufferPos += l;
  2316. return l;
  2317. }
  2318. void StringStream::flushBuffer(int minBufferAllocation) {
  2319. if (this->bufferPos > this->len) this->len = this->bufferPos;
  2320. _str.reserve(_str.length() + minBufferAllocation);
  2321. _str.resize(_str.capacity());
  2322. this->bufferSize = _str.length();
  2323. buffer = (uint8_t*) _str.data();
  2324. }
  2325. BufferedOutput* StringStream::getBufferedOutput() {
  2326. return this;
  2327. }
  2328. void StringStream::clear() {
  2329. _str.clear();
  2330. }
  2331. void listDirectory(const char* path, Delegate<void(const char*)> cb) {
  2332. DIR* d = opendir(path);
  2333. if (d == NULL) {
  2334. throw runtime_error(strerror(errno));
  2335. return;
  2336. }
  2337. int len = offsetof(dirent, d_name)+ pathconf(path, _PC_NAME_MAX) + 1;
  2338. char ent[len];
  2339. dirent* ent1 = (dirent*) ent;
  2340. while (readdir_r(d, (dirent*) ent, &ent1) == 0 && ent1 != NULL) {
  2341. if (strcmp(ent1->d_name, ".") == 0 || strcmp(ent1->d_name, "..") == 0) continue;
  2342. cb(ent1->d_name);
  2343. }
  2344. closedir(d);
  2345. }
  2346. MemoryPool::MemoryPool(int size, int maxItems) :
  2347. _freeList(NULL), _lastFree(NULL), size(size), items(0), maxItems(maxItems) {
  2348. }
  2349. MemoryPool::~MemoryPool() {
  2350. _item* tmp = _freeList;
  2351. while (tmp != NULL) {
  2352. _item* n = tmp->nextFree;
  2353. ::free(tmp);
  2354. tmp = n;
  2355. }
  2356. }
  2357. void* MemoryPool::alloc() {
  2358. if (_freeList == NULL) {
  2359. _item* tmp = (_item*) malloc(size + sizeof(_item));
  2360. tmp->nextFree = (_item*) this;
  2361. return tmp + 1;
  2362. } else {
  2363. _item* tmp = _freeList;
  2364. _freeList = _freeList->nextFree;
  2365. items--;
  2366. if (tmp == _lastFree) _lastFree = NULL;
  2367. tmp->nextFree = (_item*) this; //for double-free detection
  2368. return (tmp + 1);
  2369. }
  2370. }
  2371. void* MemoryPool::alloc(int s) {
  2372. if (s != size) throw CPollException(
  2373. "attempting to allocate an object of the wrong size from a MemoryPool");
  2374. return alloc();
  2375. }
  2376. void MemoryPool::dealloc(void* obj) {
  2377. _item* o = ((_item*) obj) - 1;
  2378. if (o->nextFree != (_item*) this) throw runtime_error(
  2379. "MemoryPool::free(): double free or corruption");
  2380. if (items > maxItems) {
  2381. ::free(o);
  2382. } else {
  2383. items++;
  2384. o->nextFree = NULL;
  2385. if (_lastFree != NULL) {
  2386. _lastFree->nextFree = o;
  2387. }
  2388. _lastFree = o;
  2389. if (_freeList == NULL) _freeList = o;
  2390. }
  2391. }
  2392. PThreadMutex::PThreadMutex() {
  2393. pthread_mutexattr_t attr;
  2394. pthread_mutexattr_init(&attr);
  2395. pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
  2396. pthread_mutex_init(&m, &attr);
  2397. pthread_mutexattr_destroy(&attr);
  2398. }
  2399. PThreadMutex::~PThreadMutex() {
  2400. pthread_mutex_destroy(&m);
  2401. }
  2402. void PThreadMutex::lock() {
  2403. pthread_mutex_lock(&m);
  2404. }
  2405. void PThreadMutex::unlock() {
  2406. pthread_mutex_unlock(&m);
  2407. }
  2408. }