cpoll.H 52 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784
  1. /*
  2. This program is free software: you can redistribute it and/or modify
  3. it under the terms of the GNU General Public License as published by
  4. the Free Software Foundation, either version 3 of the License, or
  5. (at your option) any later version.
  6. This program is distributed in the hope that it will be useful,
  7. but WITHOUT ANY WARRANTY; without even the implied warranty of
  8. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  9. GNU General Public License for more details.
  10. You should have received a copy of the GNU General Public License
  11. along with this program. If not, see <http://www.gnu.org/licenses/>.
  12. * */
  13. /*
  14. * uses code from http://www.cse.yorku.ca/~oz/hash.html
  15. * for sdbm hash algorithm
  16. * */
  17. /*
  18. * TODO:
  19. *
  20. * * move readAll and writeAll implementation to Stream
  21. * * automatic AIO for file async IO
  22. * * socket connect()
  23. * * verify that all system call return values are checked
  24. * * verify that all code paths that may encounter exceptions
  25. * release their resources (memory etc) properly when
  26. * exceptions are encountered
  27. * * HTTP support
  28. *
  29. *
  30. * */
  31. #ifndef __INCLUDED_CPOLL_H
  32. #define __INCLUDED_CPOLL_H
  33. #include <string>
  34. #include <exception>
  35. #include <poll.h>
  36. #include <arpa/inet.h>
  37. #include <netinet/in.h>
  38. #include <sys/un.h>
  39. #include <vector>
  40. #include <sys/signalfd.h>
  41. #include <sys/eventfd.h>
  42. #include <delegate.H>
  43. #include <sys/epoll.h>
  44. #include <limits>
  45. #include "basictypes.H"
  46. #include "statemachines.H"
  47. #include <fcntl.h>
  48. #include <math.h>
  49. #ifndef WARNLEVEL
  50. #define WARNLEVEL 5
  51. #endif
  52. #ifndef WARN
  53. #define WARN(LEVEL,MSG) if(LEVEL<=WARNLEVEL){if(LEVEL<=1)cerr << "\x1B[41;1;33m"; else if(LEVEL<=2)cerr << "\x1B[1;1;1m"; cerr << MSG << "\x1B[0;0;0m" << endl;}
  54. #endif
  55. /*
  56. * CPoll: low level c++ wrapper for poll() and epoll(); can be implemented on
  57. * other OS's using mechanisms such as overlapped I/O(windows), but currently
  58. * there is only a linux implementation; for other OS's patches are welcome ;)
  59. *
  60. * simple usage example:
  61. *
  62. * char buf[4096];
  63. * char buf2[4096];
  64. * File f(1); //stdout
  65. * File f2(2); //stderr
  66. * f.read(buf, 4096, [](){cout << "read1 done" << endl;});
  67. * f2.read(buf2, 4096, [](){cout << "read2 done" << endl;});
  68. *
  69. * Poll p;
  70. * p.add(f);
  71. * p.add(f2);
  72. * p.loop(); //epoll is used
  73. *
  74. *
  75. *
  76. *
  77. * nested example:
  78. *
  79. * File f(0); //stdin
  80. * char buf[4096];
  81. * f.read([](){}, buf,4096);
  82. * Poll poll;
  83. * poll.add(f);
  84. * Poll poll2;
  85. * poll2.add(poll);
  86. * poll2.loop();
  87. */
  88. #ifndef likely
  89. #define likely(x) __builtin_expect((x),1)
  90. #define unlikely(x) __builtin_expect((x),0)
  91. #endif
  92. namespace CP
  93. {
  94. using namespace std;
  95. class CPollException: public std::exception
  96. {
  97. public:
  98. string message;
  99. int32_t number;
  100. CPollException();
  101. CPollException(int32_t number);
  102. CPollException(string message, int32_t number = 0);
  103. ~CPollException() throw ();
  104. const char* what() const throw ();
  105. };
  106. class AbortException: public std::exception
  107. { //used for aborting the event loop
  108. public:
  109. AbortException();
  110. ~AbortException() throw ();
  111. const char* what() const throw ();
  112. };
  113. class CancelException: public std::exception
  114. { //used for cancelling a repeat[Read|Write] operation
  115. //do NOT throw this exception on one-shot operations such as read()
  116. public:
  117. CancelException();
  118. ~CancelException() throw ();
  119. const char* what() const throw ();
  120. };
  121. static inline uint64_t ntohll(uint64_t value) {
  122. // The answer is 42
  123. static const int32_t num = 42;
  124. // Check the endianness
  125. if (*reinterpret_cast<const char*>(&num) == num) {
  126. const uint32_t high_part = htonl(static_cast<uint32_t>(value >> 32));
  127. const uint32_t low_part = htonl(static_cast<uint32_t>(value & 0xFFFFFFFFLL));
  128. return (static_cast<uint64_t>(low_part) << 32) | high_part;
  129. } else {
  130. return value;
  131. }
  132. }
  133. //==============================================================
  134. //==============================================================
  135. //=================NETWORK UTILITY CLASSES======================
  136. //=====================taken from cplib=========================
  137. //==============================================================
  138. //==============================================================
  139. struct IPAddress
  140. {
  141. in_addr a;
  142. inline IPAddress() {
  143. }
  144. inline IPAddress(const char* addr/*NOT hostname*/) {
  145. inet_pton(AF_INET, addr, &a.s_addr);
  146. }
  147. inline IPAddress(const in_addr& a) {
  148. this->a = a;
  149. }
  150. inline bool operator<(const IPAddress& other) const {
  151. return ntohl(a.s_addr) < ntohl(other.a.s_addr);
  152. }
  153. inline bool operator>(const IPAddress& other) const {
  154. return ntohl(a.s_addr) > ntohl(other.a.s_addr);
  155. }
  156. inline bool operator<=(const IPAddress& other) const {
  157. return ntohl(a.s_addr) <= ntohl(other.a.s_addr);
  158. }
  159. inline bool operator>=(const IPAddress& other) const {
  160. return ntohl(a.s_addr) >= ntohl(other.a.s_addr);
  161. }
  162. inline bool operator==(const IPAddress& other) const {
  163. return a.s_addr == other.a.s_addr;
  164. }
  165. inline IPAddress operator+(const IPAddress& other) const {
  166. return IPAddress( { htonl(ntohl(a.s_addr) + ntohl(other.a.s_addr)) });
  167. }
  168. inline IPAddress operator-(const IPAddress& other) const {
  169. return IPAddress( { htonl(ntohl(a.s_addr) - ntohl(other.a.s_addr)) });
  170. }
  171. inline IPAddress operator+(int i) const {
  172. //WARN(1,a.s_addr << " " <<ntohl(a.s_addr));
  173. //cout << "a" << endl;
  174. return IPAddress( { htonl(ntohl(a.s_addr) + i) });
  175. }
  176. inline IPAddress operator-(int i) const {
  177. return IPAddress( { htonl(ntohl(a.s_addr) - i) });
  178. }
  179. string toStr() const {
  180. char tmp[INET_ADDRSTRLEN];
  181. if (inet_ntop(AF_INET, &a, tmp, INET_ADDRSTRLEN) == NULL) throw CPollException();
  182. return string(tmp);
  183. }
  184. };
  185. struct IPv6Address
  186. {
  187. in6_addr a;
  188. inline IPv6Address() {
  189. }
  190. inline IPv6Address(const char* addr) {
  191. inet_pton(AF_INET6, addr, &a.__in6_u);
  192. }
  193. inline IPv6Address(const in6_addr& a) {
  194. this->a = a;
  195. }
  196. string toStr() const {
  197. char tmp[INET_ADDRSTRLEN];
  198. if (inet_ntop(AF_INET6, &a, tmp, INET6_ADDRSTRLEN) == NULL) throw CPollException();
  199. return string(tmp);
  200. }
  201. };
  202. class EndPoint: virtual public RGC::Object
  203. {
  204. public:
  205. int32_t addressFamily;
  206. virtual void getSockAddr(sockaddr* addr) const=0;
  207. virtual void setSockAddr(const sockaddr* addr)=0;
  208. virtual int32_t getSockAddrSize() const=0;
  209. static EndPoint* fromSockAddr(const sockaddr* addr);
  210. static EndPoint* create(int32_t addressFamily);
  211. static int getSize(int32_t addressFamily);
  212. static EndPoint* construct(void* mem, int32_t addressFamily);
  213. virtual void clone(EndPoint& to) const=0;
  214. virtual ~EndPoint() {
  215. }
  216. static vector<RGC::Ref<EndPoint> > lookupHost(const char* hostname, const char* port,
  217. int32_t family = AF_UNSPEC, int32_t socktype = 0, int32_t proto = 0, int32_t flags = 0);
  218. //static EndPoint Resolve(
  219. virtual string toStr() const=0;
  220. };
  221. class IPEndPoint: public EndPoint
  222. {
  223. public:
  224. IPAddress address;
  225. in_port_t port;
  226. IPEndPoint();
  227. IPEndPoint(IPAddress address, in_port_t port);
  228. void set_addr(const sockaddr_in& addr);
  229. virtual void setSockAddr(const sockaddr* addr);
  230. IPEndPoint(const sockaddr_in& addr);
  231. virtual void getSockAddr(sockaddr* addr) const;
  232. virtual int32_t getSockAddrSize() const;
  233. virtual void clone(EndPoint& to) const;
  234. virtual string toStr() const;
  235. };
  236. class IPv6EndPoint: public EndPoint
  237. {
  238. public:
  239. IPv6Address address;
  240. in_port_t port;
  241. uint32_t flowInfo;
  242. uint32_t scopeID;
  243. IPv6EndPoint();
  244. IPv6EndPoint(IPv6Address address, in_port_t port);
  245. void set_addr(const sockaddr_in6& addr);
  246. IPv6EndPoint(const sockaddr_in6& addr);
  247. virtual void setSockAddr(const sockaddr* addr);
  248. virtual void getSockAddr(sockaddr* addr) const;
  249. virtual int32_t getSockAddrSize() const;
  250. virtual void clone(EndPoint& to) const;
  251. virtual string toStr() const;
  252. };
  253. class UNIXEndPoint: public EndPoint
  254. {
  255. public:
  256. string name;
  257. UNIXEndPoint();
  258. UNIXEndPoint(string name);
  259. void set_addr(const sockaddr_un& addr);
  260. UNIXEndPoint(const sockaddr_un& addr);
  261. virtual void setSockAddr(const sockaddr* addr);
  262. virtual void getSockAddr(sockaddr* addr) const;
  263. virtual int32_t getSockAddrSize() const;
  264. virtual void clone(EndPoint& to) const;
  265. virtual string toStr() const;
  266. };
  267. //===========================================================
  268. //======================ABSTRACT CLASSES=====================
  269. //===========================================================
  270. class BufferedOutput: virtual public RGC::Object
  271. {
  272. public:
  273. uint8_t* buffer;
  274. int bufferPos;
  275. int bufferSize;
  276. BufferedOutput() {
  277. }
  278. BufferedOutput(uint8_t* buffer, int bufferPos, int bufferSize) :
  279. buffer(buffer), bufferPos(bufferPos), bufferSize(bufferSize) {
  280. }
  281. //flushBuffer() should flush current buffer to stream, reallocate the buffer,
  282. //and reset buffer, bufferPos and/or bufferSize accordingly (see below); the function
  283. //must reserve at least minBufferAllocation bytes of space in the write buffer
  284. virtual void flushBuffer(int minBufferAllocation)=0;
  285. void flush() {
  286. flushBuffer(0);
  287. }
  288. };
  289. class Stream: virtual public RGC::Object
  290. {
  291. public:
  292. Stream() = default;
  293. Stream(const Stream& other) = delete;
  294. Stream& operator=(const Stream& other) = delete;
  295. union
  296. {
  297. struct
  298. {
  299. DelegateBase<void(int)> cb;
  300. BufferedOutput* out;
  301. int bufSize;
  302. int br;
  303. } _readToEnd;
  304. struct
  305. {
  306. DelegateBase<void(int)> cb;
  307. BufferedOutput* out;
  308. int len;
  309. int bufSize;
  310. int br;
  311. } _readChunked;
  312. struct
  313. {
  314. DelegateBase<void(int)> cb;
  315. iovec* iov;
  316. int iovcnt;
  317. int i, br;
  318. } _readvAll;
  319. struct
  320. {
  321. DelegateBase<void(int)> cb;
  322. uint8_t* buf;
  323. int len;
  324. int i;
  325. } _readAll;
  326. };
  327. union
  328. {
  329. struct
  330. {
  331. DelegateBase<void(int)> cb;
  332. iovec* iov;
  333. int iovcnt;
  334. int i, br;
  335. } _writevAll;
  336. struct
  337. {
  338. DelegateBase<void(int)> cb;
  339. const uint8_t* buf;
  340. int len;
  341. int i;
  342. } _writeAll;
  343. };
  344. void _readvCB(int r);
  345. void _readAllCB(int r);
  346. void _writevCB(int r);
  347. void _writeAllCB(int r);
  348. //sync
  349. virtual int32_t read(void* buf, int32_t len)=0; //required
  350. int32_t read(String buf) {
  351. return read(buf.data(), buf.length());
  352. }
  353. virtual int32_t readv(iovec* iov, int iovcnt) { //optional
  354. if (iovcnt <= 0) return 0;
  355. return read(iov[0].iov_base, iov[0].iov_len);
  356. }
  357. virtual int32_t readAll(void* buf, int32_t len) { //optional
  358. int off = 0;
  359. while (off < len) {
  360. int tmp = read((uint8_t*) buf + off, len - off);
  361. if (tmp <= 0) return off;
  362. off += tmp;
  363. }
  364. return off;
  365. }
  366. int32_t readAll(String buf) {
  367. return readAll(buf.data(), buf.length());
  368. }
  369. //note: may destroy the iov array
  370. virtual int32_t readvAll(iovec* iov, int iovcnt) { //optional
  371. int i = 0;
  372. int br = 0;
  373. while (i < iovcnt) {
  374. int r = readv(iov + i, iovcnt - i);
  375. if (r <= 0) break;
  376. br += r;
  377. while (r > 0 && i < iovcnt) {
  378. if ((int) iov[i].iov_len > r) {
  379. iov[i].iov_base = ((uint8_t*) iov[i].iov_base) + r;
  380. iov[i].iov_len -= r;
  381. break;
  382. } else {
  383. r -= iov[i].iov_len;
  384. i++;
  385. }
  386. }
  387. }
  388. return br;
  389. }
  390. virtual int readToEnd(BufferedOutput& out, int32_t bufSize = 4096);
  391. virtual int readChunked(BufferedOutput& out, int32_t len, int32_t bufSize = 4096);
  392. virtual int32_t write(const void* buf, int32_t len)=0; //required
  393. int32_t write(String buf) {
  394. return write(buf.data(), buf.length());
  395. }
  396. virtual int32_t writev(iovec* iov, int iovcnt) { //optional
  397. if (iovcnt <= 0) return 0;
  398. return write(iov[0].iov_base, iov[0].iov_len);
  399. }
  400. virtual int32_t writeAll(const void* buf, int32_t len) { //optional
  401. int off = 0;
  402. while (off < len) {
  403. int tmp = write((uint8_t*) buf + off, len - off);
  404. if (tmp <= 0) return off;
  405. off += tmp;
  406. }
  407. return off;
  408. }
  409. int32_t writeAll(String buf) {
  410. return writeAll(buf.data(), buf.length());
  411. }
  412. virtual int32_t writevAll(iovec* iov, int iovcnt) { //optional
  413. int i = 0;
  414. int br = 0;
  415. while (i < iovcnt) {
  416. int r = writev(iov + i, iovcnt - i);
  417. if (r <= 0) break;
  418. br += r;
  419. while (r > 0 && i < iovcnt) {
  420. if ((int) iov[i].iov_len > r) {
  421. iov[i].iov_base = ((uint8_t*) iov[i].iov_base) + r;
  422. iov[i].iov_len -= r;
  423. break;
  424. } else {
  425. r -= iov[i].iov_len;
  426. i++;
  427. }
  428. }
  429. }
  430. return br;
  431. }
  432. //async
  433. //all new subclasses of Stream now should only need to implement the hybrid[Read|Write] functions
  434. //to provide async behavior; apps using [read|write](..., cb) will still work through the
  435. //compatibility wrappers
  436. //returns -2 to indicate that the operation is scheduled to be completed asynchronously; otherwise
  437. //the request has been completed synchronously
  438. //UPDATE: hybrid[Read|Write] has been canceled because it is extremely hard for users to design applications
  439. // to deal with 2 programming models at once and it is very error-prone
  440. /*virtual int32_t hybridRead(void* buf, int32_t len, const Callback& cb, bool repeat = false) { //optional (required for async)
  441. }
  442. virtual int32_t hybridWrite(const void* buf, int32_t len, const Callback& cb, bool repeat =
  443. false) { //optional (required for async)
  444. }*/
  445. virtual void read(void* buf, int32_t len, const Callback& cb, bool repeat = false)=0;
  446. void read(String buf, const Callback& cb, bool repeat = false) {
  447. return read(buf.data(), buf.length(), cb, repeat);
  448. }
  449. virtual void readv(iovec* iov, int iovcnt, const Callback& cb, bool repeat = false) {
  450. if (iovcnt <= 0) {
  451. cb(0);
  452. return;
  453. }
  454. return read(iov[0].iov_base, iov[0].iov_len, cb, repeat);
  455. }
  456. inline void repeatRead(void* buf, int32_t len, const Callback& cb) {
  457. read(buf, len, cb, true);
  458. }
  459. inline void repeatRead(String buf, const Callback& cb) {
  460. read(buf, cb, true);
  461. }
  462. inline void repeatReadv(iovec* iov, int iovcnt, const Callback& cb) {
  463. readv(iov, iovcnt, cb, true);
  464. }
  465. virtual void readAll(void* buf, int32_t len, const Callback& cb);
  466. void readAll(String buf, const Callback& cb) {
  467. return readAll(buf.data(), buf.length(), cb);
  468. }
  469. virtual void readvAll(iovec* iov, int iovcnt, const Callback& cb);
  470. virtual void readToEnd(BufferedOutput& out, const Callback& cb, int bufSize = 4096);
  471. virtual void readChunked(BufferedOutput& out, int32_t len, const Callback& cb, int bufSize =
  472. 4096);
  473. virtual void write(const void* buf, int32_t len, const Callback& cb, bool repeat = false)=0;
  474. void write(String buf, const Callback& cb, bool repeat = false) {
  475. return write(buf.data(), buf.length(), cb, repeat);
  476. }
  477. virtual void writev(iovec* iov, int iovcnt, const Callback& cb, bool repeat = false) {
  478. if (iovcnt <= 0) {
  479. cb(0);
  480. return;
  481. }
  482. return write(iov[0].iov_base, iov[0].iov_len, cb, repeat);
  483. }
  484. inline void repeatWrite(const void* buf, int32_t len, const Callback& cb) {
  485. write(buf, len, cb, true);
  486. }
  487. inline void repeatWrite(String buf, const Callback& cb) {
  488. write(buf, cb, true);
  489. }
  490. inline void repeatWritev(iovec* iov, int iovcnt, const Callback& cb) {
  491. writev(iov, iovcnt, cb, true);
  492. }
  493. virtual void writeAll(const void* buf, int32_t len, const Callback& cb);
  494. void writeAll(String buf, const Callback& cb) {
  495. return writeAll(buf.data(), buf.length(), cb);
  496. }
  497. virtual void writevAll(iovec* iov, int iovcnt, const Callback& cb);
  498. //only for async read/write operations
  499. virtual void cancelRead()=0;
  500. virtual void cancelWrite()=0;
  501. //=========misc===========
  502. //sync
  503. virtual void close()=0; //may block
  504. virtual void flush()=0;
  505. //async
  506. virtual void close(const Callback& cb)=0;
  507. virtual void flush(const Callback& cb)=0;
  508. //only for streams with a read buffer; other streams should just return 0
  509. //calling this function should "consume" that buffer, but should not perform
  510. //further reading operations; this function should NOT block.
  511. //returns length of buffer; address of buffer is put in buf
  512. //if maxlen==-1, consume entire contiguous buffer
  513. virtual int32_t readBuffer(void*& buf, int32_t maxlen) {
  514. return 0;
  515. }
  516. inline String readBuffer(int32_t maxlen) {
  517. void* tmp = NULL;
  518. int32_t l = readBuffer(tmp, maxlen);
  519. return {(char*)tmp,l};
  520. }
  521. //freeBuffer() must be called following every readBuffer() call;
  522. //after calling readBuffer() and before calling freeBuffer(),
  523. //all other operations are undefined
  524. virtual void freeBuffer(void* buf, int32_t len) {
  525. }
  526. //returns NULL if it doesn't provide a buffer; in that case you need to create
  527. //a StreamBuffer yourself
  528. virtual BufferedOutput* getBufferedOutput();
  529. };
  530. class StreamBuffer: public BufferedOutput
  531. {
  532. public:
  533. StreamBuffer(const StreamBuffer& other) = delete;
  534. StreamBuffer& operator=(const StreamBuffer& other) = delete;
  535. RGC::Ref<Stream> output;
  536. StreamBuffer();
  537. StreamBuffer(Stream& s, int bufsize = 4096);
  538. virtual void flushBuffer(int minBufferAllocation);
  539. ~StreamBuffer() {
  540. if (this->buffer != NULL) free(this->buffer);
  541. }
  542. }
  543. ;
  544. class FixedMemoryStream;
  545. class MemoryStream;
  546. class StringStream;
  547. //StreamWriter will directly access the BufferedOutput's buffer
  548. //(for performance reasons) and increment bufferPos accordingly.
  549. //if bufferPos reaches bufferSize, it will call the BufferedOutput's flushBuffer() method
  550. class StreamWriter: public RGC::Object
  551. {
  552. public:
  553. StreamWriter(const StreamWriter& other) = delete;
  554. StreamWriter& operator=(const StreamWriter& other) = delete;
  555. RGC::Ref<Object> outp;
  556. BufferedOutput* buffer;
  557. char _sb[sizeof(StreamBuffer)];
  558. StreamBuffer& sb;
  559. void write(const void* s, int len) {
  560. if (buffer->bufferSize - buffer->bufferPos < len) buffer->flushBuffer(len);
  561. memcpy(buffer->buffer + buffer->bufferPos, s, len);
  562. buffer->bufferPos += len;
  563. }
  564. void write(const MemoryBuffer& buf) {
  565. write(buf.data(), buf.length());
  566. }
  567. void write(string s) {
  568. write(s.data(), s.length());
  569. }
  570. void write(String s) {
  571. write(s.data(), s.length());
  572. }
  573. void write(const char* s) {
  574. write((const uint8_t*) s, strlen(s));
  575. }
  576. void write(char c) {
  577. if (unlikely(buffer->bufferSize - buffer->bufferPos < 1)) buffer->flushBuffer(1);
  578. buffer->buffer[buffer->bufferPos] = c;
  579. buffer->bufferPos++;
  580. }
  581. void write(int8_t i) {
  582. //snprintf() writes a null byte and possibly a negative sign, so gotta reserve 5 bytes of buffer space
  583. if (unlikely(buffer->bufferSize - buffer->bufferPos < 5)) buffer->flushBuffer(5);
  584. buffer->bufferPos += snprintf((char*) buffer->buffer + buffer->bufferPos, 5, "%hhi", i);
  585. }
  586. void write(int16_t i) {
  587. if (unlikely(buffer->bufferSize - buffer->bufferPos < 7)) buffer->flushBuffer(7);
  588. buffer->bufferPos += snprintf((char*) buffer->buffer + buffer->bufferPos, 7, "%hi", i);
  589. }
  590. void write(int32_t i) {
  591. if (unlikely(buffer->bufferSize - buffer->bufferPos < 12)) buffer->flushBuffer(12);
  592. buffer->bufferPos += snprintf((char*) buffer->buffer + buffer->bufferPos, 12, "%i", i);
  593. }
  594. void write(int64_t i) {
  595. if (unlikely(buffer->bufferSize - buffer->bufferPos < 22)) buffer->flushBuffer(22);
  596. buffer->bufferPos += snprintf((char*) buffer->buffer + buffer->bufferPos, 22, "%lli", i);
  597. }
  598. void write(uint8_t i) {
  599. if (unlikely(buffer->bufferSize - buffer->bufferPos < 4)) buffer->flushBuffer(4);
  600. buffer->bufferPos += snprintf((char*) buffer->buffer + buffer->bufferPos, 4, "%hhu", i);
  601. }
  602. void write(uint16_t i) {
  603. if (unlikely(buffer->bufferSize - buffer->bufferPos < 6)) buffer->flushBuffer(6);
  604. buffer->bufferPos += snprintf((char*) buffer->buffer + buffer->bufferPos, 6, "%hu", i);
  605. }
  606. void write(uint32_t i) {
  607. if (unlikely(buffer->bufferSize - buffer->bufferPos < 11)) buffer->flushBuffer(11);
  608. buffer->bufferPos += snprintf((char*) buffer->buffer + buffer->bufferPos, 11, "%u", i);
  609. }
  610. void write(uint64_t i) {
  611. if (unlikely(buffer->bufferSize - buffer->bufferPos < 21)) buffer->flushBuffer(21);
  612. buffer->bufferPos += snprintf((char*) buffer->buffer + buffer->bufferPos, 21, "%llu", i);
  613. }
  614. char* beginWrite(int len) {
  615. if (unlikely(buffer->bufferSize - buffer->bufferPos < len)) buffer->flushBuffer(len);
  616. return (char*) buffer->buffer + buffer->bufferPos;
  617. }
  618. void endWrite(int len) {
  619. buffer->bufferPos += len;
  620. }
  621. template<class ... P> void writeF(const char* fmt, P&&... p) {
  622. int minSpace = 0;
  623. if (minSpace + 1 > buffer->bufferSize - buffer->bufferPos) {
  624. redo: buffer->flushBuffer(minSpace + 1);
  625. }
  626. minSpace = snprintf((char*) buffer->buffer + buffer->bufferPos,
  627. buffer->bufferSize - buffer->bufferPos, fmt, std::forward<P>(p)...);
  628. if (minSpace + 1 > buffer->bufferSize - buffer->bufferPos) goto redo;
  629. buffer->bufferPos += minSpace;
  630. }
  631. void flush() {
  632. buffer->flushBuffer(0);
  633. }
  634. StreamWriter(BufferedOutput& s);
  635. StreamWriter(Stream& s);
  636. StreamWriter(MemoryStream& s);
  637. StreamWriter(StringStream& s);
  638. ~StreamWriter();
  639. };
  640. class StreamReader: public Stream
  641. { //writes are simply passed through
  642. public:
  643. typedef Delegate<void(const string&)> Callback;
  644. typedef Delegate<void(int)> StreamCallback;
  645. RGC::Ref<Stream> input;
  646. //void* sr;
  647. newStreamReader _sr;
  648. bool* deletionFlag;
  649. Stream* out_s;
  650. union
  651. {
  652. DelegateBase<void(const string&)> cb;
  653. DelegateBase<void(int)> cb_s;
  654. };
  655. //BufferedOutput* tmp_out;
  656. string tmp;
  657. string tmp_delim;
  658. int tmp_i;
  659. int bufSize;
  660. bool eof;
  661. /*void* curBuffer;
  662. int curBufferLen;
  663. int bufferSize;
  664. bool bufferIsBorrowed;*/
  665. StreamReader(Stream& input, int bufsize = 4096);
  666. ~StreamReader();
  667. string readTo(char delim);
  668. string readTo(const char* delim, int delimLen);
  669. string readTo(string delim);
  670. string readLine();
  671. int readTo(char delim, Stream& s);
  672. int readTo(const char* delim, int delimLen, Stream& s);
  673. int readTo(string delim, Stream& s);
  674. int readLine(Stream& s);
  675. void readTo(char delim, const Callback& cb);
  676. //*delim MUST NOT BE DELETED FOR THE ENTIRE DURATION OF THE READTO REQUEST!!!!!
  677. //it does not make a copy of delim!!!!!
  678. void readTo(const char* delim, int delimLen, const Callback& cb);
  679. void readTo(string delim, const Callback& cb);
  680. void readLine(const Callback& cb);
  681. void readTo(char delim, Stream& s, const StreamCallback& cb);
  682. void readTo(const char* delim, int delimLen, Stream& s, const StreamCallback& cb);
  683. void readTo(string delim, Stream& s, const StreamCallback& cb);
  684. void readLine(Stream& s, const StreamCallback& cb);
  685. //sync
  686. virtual int32_t read(void* buf, int32_t len);
  687. virtual int32_t write(const void* buf, int32_t len) {
  688. return input->write(buf, len);
  689. }
  690. virtual int32_t writeAll(const void* buf, int32_t len) {
  691. return input->writeAll(buf, len);
  692. }
  693. //async
  694. virtual void read(void* buf, int32_t len, const CP::Callback& cb, bool repeat = false);
  695. virtual void readAll(void* buf, int32_t len, const CP::Callback& cb);
  696. virtual void write(const void* buf, int32_t len, const CP::Callback& cb,
  697. bool repeat = false) {
  698. return input->write(buf, len, cb, repeat);
  699. }
  700. virtual void writeAll(const void* buf, int32_t len, const CP::Callback& cb) {
  701. return input->writeAll(buf, len, cb);
  702. }
  703. //does NOT cancel readTo() or readLine() operations
  704. virtual void cancelRead();
  705. virtual void cancelWrite();
  706. //=========misc===========
  707. //sync
  708. virtual void close();
  709. virtual void flush();
  710. //async
  711. virtual void close(const CP::Callback& cb);
  712. virtual void flush(const CP::Callback& cb);
  713. virtual int32_t readBuffer(void*& buf, int32_t maxlen);
  714. virtual void freeBuffer(void* buf, int32_t len);
  715. //internal
  716. bool _loop(bool);
  717. void _beginRead();
  718. void _doSyncRead();
  719. void _readCB(int i);
  720. }
  721. ;
  722. //===========================================================
  723. //======================UTILITY CLASSES======================
  724. //===========================================================
  725. //===========================================================
  726. //======================DATA STRUCTURES======================
  727. //===========================================================
  728. enum class Events
  729. : event_t
  730. {
  731. none = 0, in = 1, out = 2, other = 4, all = 7
  732. }
  733. ;
  734. static inline Events operator&(Events e1, Events e2) {
  735. return (Events) (((event_t) e1) & ((event_t) e2));
  736. }
  737. static inline Events operator|(Events e1, Events e2) {
  738. return (Events) (((event_t) e1) | ((event_t) e2));
  739. }
  740. static inline Events operator^(Events e1, Events e2) {
  741. return (Events) (((event_t) e1) ^ ((event_t) e2));
  742. }
  743. static inline const Events& operator&=(Events& e1, Events e2) {
  744. return (Events&) (((event_t&) e1) &= ((event_t) e2));
  745. }
  746. static inline const Events& operator|=(Events& e1, Events e2) {
  747. return (Events&) (((event_t&) e1) |= ((event_t) e2));
  748. }
  749. static inline const Events& operator^=(Events& e1, Events e2) {
  750. return (Events&) (((event_t&) e1) ^= ((event_t) e2));
  751. }
  752. enum class Operations
  753. : uint8_t
  754. {
  755. none = 0, read = 1, write, send, recv, sendTo, recvFrom, accept, readAll, writeAll,
  756. sendAll, recvAll, shutdown, connect, close, readv, writev, lastItem
  757. };
  758. struct EventData
  759. {
  760. public:
  761. bool hungUp, error;
  762. };
  763. struct EventHandlerData
  764. {
  765. public:
  766. Callback cb;
  767. union miscUnion
  768. {
  769. struct
  770. {
  771. void* buf;
  772. union
  773. {
  774. EndPoint* ep; //recvFrom
  775. const EndPoint* const_ep; //sendTo
  776. };
  777. int32_t len;
  778. int32_t flags;
  779. int32_t len_done;
  780. } bufferIO;
  781. struct
  782. {
  783. iovec* iov;
  784. int iovcnt;
  785. } bufferIOv;
  786. struct
  787. {
  788. int32_t how;
  789. } shutdown;
  790. struct
  791. {
  792. eventfd_t evt;
  793. } eventfd;
  794. } misc;
  795. Delegate<bool(Events event, EventHandlerData& ed, const EventData& evtd, bool confident)> opcb;
  796. Operations op;
  797. enum class States
  798. : uint8_t {
  799. invalid = 0, once = 1, repeat
  800. } state;
  801. EventHandlerData() :
  802. state(States::invalid) {
  803. }
  804. };
  805. static const int32_t numEvents = 2;
  806. //============================================================
  807. //============================================================
  808. //=======================MAIN CLASSES=========================
  809. //============================================================
  810. //============================================================
  811. class Handle: virtual public RGC::Object
  812. {
  813. public:
  814. Handle(const Handle& other) = delete;
  815. Handle& operator=(const Handle& other) = delete;
  816. //void* __private;
  817. HANDLE handle;
  818. bool _supportsEPoll;
  819. Handle();
  820. Handle(HANDLE handle);
  821. Delegate<void(Handle& h, Events old_events)> onEventsChange;
  822. Delegate<void(Handle& h)> onClose;
  823. virtual void init(HANDLE handle);
  824. virtual void deinit();
  825. ///calls the callback associated with the event
  826. ///only accepts one event
  827. virtual bool dispatch(Events event, const EventData& evtd, bool confident)=0;
  828. virtual Events getEvents()=0;
  829. virtual Events dispatchMultiple(Events events, Events confident, const EventData& evtd);
  830. ///get some events from the queue.
  831. virtual Events wait(EventData& evtd);
  832. virtual Events waitAndDispatch();
  833. virtual void loop();
  834. void setBlocking(bool b = true) {
  835. int f = fcntl(handle, F_GETFL);
  836. if (b && (f & O_NONBLOCK)) {
  837. fcntl(handle, F_SETFL, f & ~O_NONBLOCK);
  838. } else if (!b && (f & O_NONBLOCK) == 0) {
  839. fcntl(handle, F_SETFL, f | O_NONBLOCK);
  840. }
  841. }
  842. //void close();
  843. ~Handle();
  844. };
  845. class File: public Handle, public Stream
  846. {
  847. public:
  848. EventHandlerData eventData[numEvents];
  849. bool* deletionFlag;
  850. Events preDispatchEvents;
  851. bool dispatching;
  852. File();
  853. File(HANDLE handle);
  854. virtual void init(HANDLE handle);
  855. Events _getEvents();
  856. ///only accepts one event
  857. EventHandlerData* beginAddEvent(Events event);
  858. void endAddEvent(Events event, bool repeat = false);
  859. void cancel(Events event);
  860. int32_t read(void* buf, int32_t len) override;
  861. int32_t readv(iovec* iov, int iovcnt) override;
  862. int32_t readAll(void* buf, int32_t len) {
  863. return Stream::readAll(buf, len);
  864. }
  865. int32_t write(const void* buf, int32_t len) override;
  866. int32_t writev(iovec* iov, int iovcnt) override;
  867. int32_t writeAll(const void* buf, int32_t len) {
  868. return Stream::writeAll(buf, len);
  869. }
  870. int32_t send(const void* buf, int32_t len, int32_t flags = 0);
  871. int32_t sendAll(const void* buf, int32_t len, int32_t flags = 0);
  872. int32_t recv(void* buf, int32_t len, int32_t flags = 0);
  873. int32_t recvAll(void* buf, int32_t len, int32_t flags = 0);
  874. Events checkEvents(Events events);
  875. virtual bool doOperation(Events event, EventHandlerData& ed, const EventData& evtd,
  876. EventHandlerData::States oldstate, bool confident);
  877. bool dispatch(Events event, const EventData& evtd, bool confident, bool& deletionFlag);
  878. bool dispatch(Events event, const EventData& evtd, bool confident) override {
  879. bool d = false;
  880. deletionFlag = &d;
  881. bool r = dispatch(event, evtd, d);
  882. if (!d) deletionFlag = NULL;
  883. return r;
  884. }
  885. Events dispatchMultiple(Events events, Events confident, const EventData& evtd) override;
  886. inline void fillIOEventHandlerData(EventHandlerData* ed, void* buf, int32_t len,
  887. const Callback& cb, Events e, Operations op);
  888. inline void fillIOEventHandlerData(EventHandlerData* ed, iovec* iov, int iovcnt,
  889. const Callback& cb, Events e, Operations op);
  890. void read(void* buf, int32_t len, const Callback& cb, bool repeat = false) override;
  891. void readv(iovec* iov, int iovcnt, const Callback& cb, bool repeat = false) override;
  892. void readAll(void* buf, int32_t len, const Callback& cb) override;
  893. void write(const void* buf, int32_t len, const Callback& cb, bool repeat = false) override;
  894. void writev(iovec* iov, int iovcnt, const Callback& cb, bool repeat = false) override;
  895. void writeAll(const void* buf, int32_t len, const Callback& cb) override;
  896. void recv(void* buf, int32_t len, int32_t flags, const Callback& cb, bool repeat = false);
  897. inline void repeatRecv(void* buf, int32_t len, int32_t flags, const Callback& cb) {
  898. recv(buf, len, flags, cb, true);
  899. }
  900. void recvAll(void* buf, int32_t len, int32_t flags, const Callback& cb);
  901. void send(const void* buf, int32_t len, int32_t flags, const Callback& cb,
  902. bool repeat = false);
  903. inline void repeatSend(const void* buf, int32_t len, int32_t flags, const Callback& cb) {
  904. send(buf, len, flags, cb, true);
  905. }
  906. void sendAll(const void* buf, int32_t len, int32_t flags, const Callback& cb);
  907. virtual Events getEvents() override final {
  908. return _getEvents();
  909. }
  910. ~File();
  911. //=========misc===========
  912. //sync
  913. virtual void close(); //may block
  914. virtual void flush();
  915. //async
  916. virtual void close(const Callback& cb);
  917. virtual void flush(const Callback& cb);
  918. //misc
  919. void cancelRead();
  920. void cancelWrite();
  921. void cancelSend() {
  922. cancelWrite();
  923. }
  924. void cancelRecv() {
  925. cancelRead();
  926. }
  927. void waitForEvent(Events event, const Callback& cb, bool repeat = false);
  928. };
  929. class Socket: public File
  930. {
  931. public:
  932. union
  933. {
  934. DelegateBase<void(Socket*)> _acceptCB;
  935. DelegateBase<void(HANDLE)> _acceptHandleCB;
  936. };
  937. //Delegate<void(Socket*)> _acceptCB;
  938. int32_t addressFamily, type, protocol;
  939. //RGC::Ref<EndPoint> peer;
  940. Socket();
  941. Socket(HANDLE handle, int32_t d, int32_t t, int32_t p);
  942. Socket(int32_t d, int32_t t = SOCK_STREAM, int32_t p = 0);
  943. virtual void init(HANDLE handle, int32_t d, int32_t t, int32_t p);
  944. virtual void init(int32_t d, int32_t t, int32_t p);
  945. //the caller must release() or free() the returned object
  946. EndPoint* getLocalEndPoint();
  947. //the caller must release() or free() the returned object
  948. EndPoint* getRemoteEndPoint();
  949. virtual bool doOperation(Events event, EventHandlerData& ed, const EventData& evtd,
  950. EventHandlerData::States oldstate, bool confident) override;
  951. void connect(const sockaddr *addr, int32_t addr_size);
  952. void connect(const EndPoint &ep);
  953. void connect(const char* hostname, const char* port, int32_t family = AF_UNSPEC,
  954. int32_t socktype = 0, int32_t proto = 0, int32_t flags = 0);
  955. void bind(const sockaddr *addr, int32_t addr_size);
  956. void bind(const EndPoint &ep);
  957. //initsock is called right after creating the socket; you can use it to set socket options
  958. //such as SO_REUSEPORT that need to be set before binding
  959. void bind(const char* hostname, const char* port, int32_t family = AF_UNSPEC,
  960. int32_t socktype = 0, int32_t proto = 0, int32_t flags = 0,
  961. Callback initsock = nullptr);
  962. int32_t shutdown(int32_t how);
  963. void shutdown(int32_t how, const Callback& cb);
  964. void listen(int32_t backlog = 8);
  965. //the caller must release() or free() the returned object
  966. Socket* accept();
  967. HANDLE acceptHandle();
  968. void connect(const sockaddr *addr, int32_t addr_size, const Callback& cb);
  969. void connect(const EndPoint &ep, const Callback& cb);
  970. void connect(const char* hostname, const char* port, const Callback& cb, int32_t family =
  971. AF_UNSPEC, int32_t socktype = 0, int32_t proto = 0, int32_t flags = 0);
  972. //callback function must release() or free() the received object
  973. void accept(const Delegate<void(Socket*)>& cb, bool repeat = false);
  974. void acceptHandle(const Delegate<void(HANDLE)>& cb, bool repeat = false);
  975. inline void repeatAccept(const Delegate<void(Socket*)>& cb) {
  976. accept(cb, true);
  977. }
  978. inline void repeatAcceptHandle(const Delegate<void(HANDLE)>& cb) {
  979. acceptHandle(cb, true);
  980. }
  981. int32_t recvFrom(void* buf, int32_t len, int32_t flags, EndPoint& ep);
  982. int32_t sendTo(const void* buf, int32_t len, int32_t flags, const EndPoint& ep);
  983. //ep has to remain valid for the entire duration of the request
  984. void recvFrom(void* buf, int32_t len, int32_t flags, EndPoint& ep, const Callback& cb,
  985. bool repeat = false);
  986. inline void repeatRecvFrom(void* buf, int32_t len, int32_t flags, EndPoint& ep,
  987. const Callback& cb) {
  988. recvFrom(buf, len, flags, ep, cb, true);
  989. }
  990. void sendTo(const void* buf, int32_t len, int32_t flags, const EndPoint& ep,
  991. const Callback& cb, bool repeat = false);
  992. inline void repeatSendTo(const void* buf, int32_t len, int32_t flags, const EndPoint& ep,
  993. const Callback& cb) {
  994. sendTo(buf, len, flags, ep, cb, true);
  995. }
  996. };
  997. class SignalFD: public Handle
  998. {
  999. public:
  1000. static int32_t MAX_EVENTS;
  1001. typedef struct signalfd_siginfo Signal;
  1002. Delegate<void(Signal& sig)> callback;
  1003. sigset_t mask;
  1004. SignalFD(HANDLE handle, const sigset_t& mask);
  1005. SignalFD(const sigset_t& mask, int32_t flags);
  1006. virtual bool dispatch(Events event, const EventData& evtd, bool confident) override;
  1007. virtual Events getEvents();
  1008. };
  1009. class Timer: public Handle
  1010. {
  1011. public:
  1012. static int32_t MAX_EVENTS;
  1013. Callback cb;
  1014. struct timespec interval;
  1015. bool* deletionFlag;
  1016. bool dispatching;
  1017. //if interval is 0, timer is disabled; timer is always recurring (unless disabled)
  1018. void setInterval(struct timespec interval);
  1019. void setInterval(uint64_t interval_ms);
  1020. void init(HANDLE handle);
  1021. void init(HANDLE handle, struct timespec interval);
  1022. void init(HANDLE handle, uint64_t interval_ms);
  1023. void init(struct timespec interval);
  1024. void init(uint64_t interval_ms = 0);
  1025. void close();
  1026. Timer(HANDLE handle);
  1027. Timer(HANDLE handle, uint64_t interval_ms);
  1028. Timer(HANDLE handle, struct timespec interval);
  1029. Timer(uint64_t interval_ms = 0);
  1030. Timer(struct timespec interval);
  1031. ~Timer();
  1032. struct timespec getInterval();
  1033. bool running();
  1034. void setCallback(const Callback& cb);
  1035. virtual bool dispatch(Events event, const EventData& evtd, bool confident) override;
  1036. virtual Events getEvents();
  1037. };
  1038. class EventFD: public File
  1039. {
  1040. public:
  1041. Delegate<void(eventfd_t)> cb;
  1042. EventFD(HANDLE handle);
  1043. EventFD(uint32_t initval = 0, int32_t flags = 0);
  1044. virtual bool doOperation(Events event, EventHandlerData& ed, const EventData& evtd,
  1045. EventHandlerData::States oldstate, bool confident) override;
  1046. eventfd_t getEvent();
  1047. void getEvent(const Delegate<void(eventfd_t)>& cb, bool repeat = false);
  1048. void repeatGetEvent(const Delegate<void(eventfd_t)>& cb) {
  1049. getEvent(cb, true);
  1050. }
  1051. int32_t sendEvent(eventfd_t evt = 1);
  1052. void sendEvent(eventfd_t evt, const Delegate<void(int32_t)>& cb);
  1053. };
  1054. //XXX: AIO support in the linux kernel is incomplete, and
  1055. // has many serious limitations such as:
  1056. // - files have to be opened as O_DIRECT
  1057. // - O_DIRECT implies that all I/O requests have to
  1058. // be block-aligned
  1059. //Because of the said reasons, AIO will not be implemented
  1060. //for now
  1061. /*class AIO: public SignalFD
  1062. {
  1063. };*/
  1064. //epoll wrapper
  1065. class EPoll: public Handle
  1066. {
  1067. public:
  1068. static int32_t MAX_EVENTS;
  1069. epoll_event* curEvents;
  1070. int32_t curIndex, curLength;
  1071. int32_t active;
  1072. HANDLE cur_handle;
  1073. bool cur_deleted;
  1074. //bool debug;
  1075. //Events cur_last_events;
  1076. //map<HANDLE, Ref<Handle> > handles;
  1077. //set<Handle*> tmp_deleted;
  1078. //bool has_deleted;
  1079. EPoll(HANDLE handle);
  1080. EPoll();
  1081. int32_t _doEPoll(int32_t timeout);
  1082. virtual bool dispatch(Events event, const EventData& evtd, bool confident) override;
  1083. virtual Events dispatchMultiple(Events event, Events confident, const EventData& evtd)
  1084. override;
  1085. virtual Events getEvents() override;
  1086. virtual Events waitAndDispatch() override;
  1087. inline void applyHandle(Handle& h, Events old_e);
  1088. void add(Handle& h);
  1089. void del(Handle& h);
  1090. };
  1091. class NewEPoll: public Handle
  1092. {
  1093. public:
  1094. static int32_t MAX_EVENTS;
  1095. struct drainInfo
  1096. {
  1097. Handle* h;
  1098. Events new_e;
  1099. };
  1100. vector<drainInfo> _pending;
  1101. vector<drainInfo>* _draining;
  1102. Handle* _dispatchingHandle;
  1103. epoll_event* _curEvents;
  1104. int32_t _curIndex, _curLength;
  1105. bool _dispatchingDeleted;
  1106. NewEPoll(HANDLE h);
  1107. NewEPoll();
  1108. virtual bool dispatch(Events event, const EventData& evtd, bool confident) override;
  1109. virtual Events dispatchMultiple(Events event, Events confident, const EventData& evtd)
  1110. override;
  1111. virtual Events getEvents() override;
  1112. virtual Events waitAndDispatch() override;
  1113. void add(Handle& h);
  1114. void del(Handle& h);
  1115. bool _doIteration(int timeout);
  1116. void _doDispatch(const epoll_event& event);
  1117. void _drainHandle(Handle& h, Events new_e);
  1118. void _queueHandle(Handle& h, Events new_e);
  1119. void _applyHandle(Handle& h, Events old_e);
  1120. };
  1121. typedef NewEPoll Poll;
  1122. class StandardStream: public Stream
  1123. {
  1124. public:
  1125. StandardStream(const StandardStream& other) = delete;
  1126. StandardStream& operator=(const StandardStream& other) = delete;
  1127. CP::File in, out;
  1128. StandardStream();
  1129. template<class P> void addToPoll(P& p) {
  1130. p.add(in);
  1131. p.add(out);
  1132. }
  1133. template<class P> void delFromPoll(P& p) {
  1134. p.del(in);
  1135. p.del(out);
  1136. }
  1137. void setBlocking(bool b = true) {
  1138. in.setBlocking(b);
  1139. out.setBlocking(b);
  1140. }
  1141. //sync
  1142. virtual int32_t read(void* buf, int32_t len);
  1143. virtual int32_t readAll(void* buf, int32_t len);
  1144. virtual int32_t write(const void* buf, int32_t len);
  1145. virtual int32_t writeAll(const void* buf, int32_t len);
  1146. //async
  1147. virtual void read(void* buf, int32_t len, const Callback& cb, bool repeat = false);
  1148. virtual void readAll(void* buf, int32_t len, const Callback& cb);
  1149. virtual void write(const void* buf, int32_t len, const Callback& cb, bool repeat = false);
  1150. virtual void writeAll(const void* buf, int32_t len, const Callback& cb);
  1151. virtual void cancelRead();
  1152. virtual void cancelWrite();
  1153. //=========misc===========
  1154. //sync
  1155. virtual void close(); //may block
  1156. virtual void flush();
  1157. //async
  1158. virtual void close(const Callback& cb);
  1159. virtual void flush(const Callback& cb);
  1160. };
  1161. class FixedMemoryStream: public Stream, public BufferedOutput, public MemoryBuffer
  1162. {
  1163. public:
  1164. FixedMemoryStream(const FixedMemoryStream& other) = delete;
  1165. FixedMemoryStream& operator=(const FixedMemoryStream& other) = delete;
  1166. //uint8_t* data;
  1167. //int len, pos;
  1168. int len;
  1169. FixedMemoryStream();
  1170. FixedMemoryStream(void* data, int len);
  1171. //sync
  1172. virtual int32_t read(void* buf, int32_t len);
  1173. virtual int32_t readAll(void* buf, int32_t len);
  1174. virtual int32_t write(const void* buf, int32_t len);
  1175. virtual int32_t writeAll(const void* buf, int32_t len);
  1176. //async
  1177. virtual void read(void* buf, int32_t len, const Callback& cb, bool repeat = false);
  1178. virtual void readAll(void* buf, int32_t len, const Callback& cb);
  1179. virtual void write(const void* buf, int32_t len, const Callback& cb, bool repeat = false);
  1180. virtual void writeAll(const void* buf, int32_t len, const Callback& cb);
  1181. virtual void cancelRead();
  1182. virtual void cancelWrite();
  1183. //=========misc===========
  1184. //sync
  1185. virtual void close();
  1186. virtual void flush();
  1187. //async
  1188. virtual void close(const Callback& cb);
  1189. virtual void flush(const Callback& cb);
  1190. virtual int32_t readBuffer(void*& buf, int32_t maxlen);
  1191. virtual void flushBuffer(int minBufferAllocation);
  1192. virtual BufferedOutput* getBufferedOutput() override;
  1193. inline virtual uint8_t* data() const final {
  1194. return buffer;
  1195. }
  1196. inline virtual int length() const final {
  1197. return len;
  1198. }
  1199. };
  1200. class MemoryStream: public FixedMemoryStream
  1201. {
  1202. public:
  1203. MemoryStream(int capacity = 4096);
  1204. ~MemoryStream();
  1205. void ensureCapacity(int c);
  1206. virtual int32_t write(const void* buf, int32_t len);
  1207. virtual int32_t writeAll(const void* buf, int32_t len);
  1208. virtual void close();
  1209. virtual void flush() {
  1210. this->flushBuffer(0);
  1211. }
  1212. void setLength(int l) {
  1213. len = l;
  1214. }
  1215. void setPosition(int i) {
  1216. bufferPos = i;
  1217. }
  1218. void clear();
  1219. virtual void flushBuffer(int minBufferAllocation);
  1220. //user must delete the MemoryStream instance after this call
  1221. void keepBuffer();
  1222. //allocates space in the buffer after the current position
  1223. CP::String beginAppend(int minLen) {
  1224. ensureCapacity(bufferPos + minLen);
  1225. return {(char*)buffer+bufferPos,bufferSize-bufferPos};
  1226. }
  1227. void endAppend(int len) {
  1228. if (bufferPos + len > bufferSize) throw overflow_error(
  1229. "invalid length parameter to MemoryStream::endAppend()");
  1230. bufferPos += len;
  1231. if (bufferPos > this->len) this->len = bufferPos;
  1232. }
  1233. };
  1234. class StringPool: public virtual RGC::Object, public RGC::Allocator
  1235. {
  1236. public:
  1237. StringPool(const StringPool& other) = delete;
  1238. StringPool& operator=(const StringPool& other) = delete;
  1239. struct _pageHeader
  1240. {
  1241. _pageHeader* next;
  1242. };
  1243. struct state
  1244. {
  1245. _pageHeader* _curPage;
  1246. _pageHeader* _curRawItem;
  1247. int _curIndex;
  1248. };
  1249. _pageHeader* _firstPage;
  1250. _pageHeader* _curPage;
  1251. _pageHeader* _firstRawItem;
  1252. _pageHeader* _curRawItem;
  1253. int _pageSize;
  1254. int _curIndex;
  1255. StringPool(int pageSize = 4096);
  1256. ~StringPool();
  1257. char* beginAdd(int sz) {
  1258. #ifdef STRINGPOOL_MEMORY_DIAG
  1259. int length = sz + sizeof(int) + 1;
  1260. #else
  1261. int length = sz + 1;
  1262. #endif
  1263. if (likely(_curPage!=NULL && length<=(_pageSize - (int) sizeof(_pageHeader) - _curIndex))) {
  1264. sssss:
  1265. //insert null byte at the end
  1266. *(((char*) (_curPage + 1)) + _curIndex + sz) = 0;
  1267. #ifdef STRINGPOOL_MEMORY_DIAG
  1268. int* ptr = (int*) (((char*) (_curPage + 1)) + _curIndex);
  1269. *ptr = sz;
  1270. memset(ptr + 1, 0xff, sz);
  1271. return (char*) (ptr + 1);
  1272. #else
  1273. return ((char*) (_curPage + 1)) + _curIndex;
  1274. #endif
  1275. }
  1276. if (length > (_pageSize - (int) sizeof(_pageHeader)) / 2) {
  1277. _addRaw(length);
  1278. return ((char*) (_curRawItem + 1));
  1279. }
  1280. _addPage();
  1281. goto sssss;
  1282. }
  1283. void endAdd(int length) {
  1284. #ifdef STRINGPOOL_MEMORY_DIAG
  1285. _curIndex += length + sizeof(int) + 1;
  1286. #else
  1287. _curIndex += length + 1;
  1288. #endif
  1289. }
  1290. char* add(int length) {
  1291. char* tmp = beginAdd(length);
  1292. endAdd(length);
  1293. return tmp;
  1294. }
  1295. char* add(const char* s, int length) {
  1296. char* tmp = beginAdd(length);
  1297. memcpy(tmp, s, length);
  1298. endAdd(length);
  1299. return tmp;
  1300. }
  1301. char* add(String s) {
  1302. return add(s.data(), s.length());
  1303. }
  1304. char* add(const char* s) {
  1305. return add(s, strlen(s));
  1306. }
  1307. String addString(int length) {
  1308. return {add(length),length};
  1309. }
  1310. String addString(const char* s, int length) {
  1311. return {add(s,length),length};
  1312. }
  1313. String addString(String s) {
  1314. return {add(s.data(), s.length()),s.length()};
  1315. }
  1316. String addString(const char* s) {
  1317. int l = strlen(s);
  1318. return {add(s, l),l};
  1319. }
  1320. void* alloc(int s) override final {
  1321. return add(s);
  1322. }
  1323. void dealloc(void* obj) override final {
  1324. #ifdef STRINGPOOL_MEMORY_DIAG
  1325. int* tmp = (int*) obj;
  1326. tmp--;
  1327. memset(obj, 0xFE, *tmp);
  1328. #endif
  1329. }
  1330. void clear();
  1331. state saveState() {
  1332. return {_curPage,_curRawItem,_curIndex};
  1333. }
  1334. //deallocate all blocks allocated after s was saved
  1335. void restoreState(state s) {
  1336. _pageHeader* h;
  1337. if (s._curPage != NULL) {
  1338. h = s._curPage->next;
  1339. s._curPage->next = NULL;
  1340. while (h != NULL) {
  1341. _pageHeader* n = h->next;
  1342. ::free(h);
  1343. h = n;
  1344. }
  1345. }
  1346. if (s._curRawItem != NULL) {
  1347. h = s._curRawItem->next;
  1348. s._curRawItem->next = NULL;
  1349. while (h != NULL) {
  1350. _pageHeader* n = h->next;
  1351. ::free(h);
  1352. h = n;
  1353. }
  1354. }
  1355. _curPage = s._curPage;
  1356. _curRawItem = s._curRawItem;
  1357. _curIndex = s._curIndex;
  1358. }
  1359. void _addPage();
  1360. void _addRaw(int len);
  1361. };
  1362. template<class T>
  1363. class PoolAllocator
  1364. {
  1365. public:
  1366. typedef size_t size_type;
  1367. typedef ptrdiff_t difference_type;
  1368. typedef T* pointer;
  1369. typedef const T* const_pointer;
  1370. typedef T& reference;
  1371. typedef const T& const_reference;
  1372. typedef T value_type;
  1373. StringPool* sp;
  1374. template<class U>
  1375. struct rebind
  1376. {
  1377. typedef PoolAllocator<U> other;
  1378. };
  1379. PoolAllocator(StringPool* sp) throw () :
  1380. sp(sp) {
  1381. }
  1382. template<class U>
  1383. PoolAllocator(const PoolAllocator<U>& other) :
  1384. sp(other.sp) {
  1385. }
  1386. ~PoolAllocator() {
  1387. }
  1388. template<class U>
  1389. PoolAllocator& operator=(const PoolAllocator<U>& other) throw () {
  1390. sp = other.sp;
  1391. return *this;
  1392. }
  1393. // address
  1394. inline pointer address(reference r) {
  1395. return &r;
  1396. }
  1397. inline const_pointer address(const_reference r) {
  1398. return &r;
  1399. }
  1400. // memory allocation
  1401. inline pointer allocate(size_type cnt, typename std::allocator<void>::const_pointer = 0) {
  1402. int size = cnt * sizeof(T);
  1403. pointer p = (pointer) sp->add(size);
  1404. //printf("allocate(size=%i): %p\n", size, p);
  1405. return p;
  1406. }
  1407. inline void deallocate(pointer p, size_type) {
  1408. }
  1409. // size
  1410. inline size_type max_size() const {
  1411. return std::numeric_limits<size_type>::max() / sizeof(T);
  1412. }
  1413. // construction/destruction
  1414. inline void construct(pointer p, const T& t) {
  1415. new (p) T(t);
  1416. }
  1417. inline void destroy(pointer p) {
  1418. p->~T();
  1419. }
  1420. inline bool operator==(PoolAllocator const&) {
  1421. return true;
  1422. }
  1423. inline bool operator!=(PoolAllocator const& a) {
  1424. return !operator==(a);
  1425. }
  1426. };
  1427. class StringStream: public Stream, public BufferedOutput, public MemoryBuffer
  1428. {
  1429. public:
  1430. StringStream(const StringStream& other) = delete;
  1431. StringStream& operator=(const StringStream& other) = delete;
  1432. //uint8_t* data;
  1433. //int len, pos;
  1434. string _str;
  1435. int len;
  1436. StringStream();
  1437. //sync
  1438. virtual int32_t read(void* buf, int32_t len);
  1439. virtual int32_t readAll(void* buf, int32_t len);
  1440. virtual int32_t write(const void* buf, int32_t len);
  1441. virtual int32_t writeAll(const void* buf, int32_t len);
  1442. //async
  1443. virtual void read(void* buf, int32_t len, const Callback& cb, bool repeat = false);
  1444. virtual void readAll(void* buf, int32_t len, const Callback& cb);
  1445. virtual void write(const void* buf, int32_t len, const Callback& cb, bool repeat = false);
  1446. virtual void writeAll(const void* buf, int32_t len, const Callback& cb);
  1447. virtual void cancelRead();
  1448. virtual void cancelWrite();
  1449. //=========misc===========
  1450. //sync
  1451. virtual void close();
  1452. virtual void flush();
  1453. //async
  1454. virtual void close(const Callback& cb);
  1455. virtual void flush(const Callback& cb);
  1456. virtual int32_t readBuffer(void*& buf, int32_t maxlen);
  1457. virtual void flushBuffer(int minBufferAllocation);
  1458. virtual BufferedOutput* getBufferedOutput() override;
  1459. void clear();
  1460. inline string str() const {
  1461. return _str.substr(0, len);
  1462. }
  1463. inline virtual uint8_t* data() const final {
  1464. return buffer;
  1465. }
  1466. inline virtual int length() const final {
  1467. return len;
  1468. }
  1469. CP::String beginAppend(int minLen) {
  1470. _str.reserve(bufferPos + minLen);
  1471. _str.resize(_str.capacity());
  1472. this->buffer = (uint8_t*) _str.data();
  1473. return {(char*)buffer+bufferPos,bufferSize-bufferPos};
  1474. }
  1475. void endAppend(int len) {
  1476. if (bufferPos + len > bufferSize) throw overflow_error(
  1477. "invalid length parameter to MemoryStream::endAppend()");
  1478. bufferPos += len;
  1479. if (bufferPos > this->len) this->len = bufferPos;
  1480. }
  1481. };
  1482. void listDirectory(const char* path, Delegate<void(const char*)> cb);
  1483. class MemoryPool: public RGC::Allocator
  1484. {
  1485. public:
  1486. struct _item
  1487. {
  1488. _item* nextFree;
  1489. };
  1490. _item* _freeList;
  1491. _item* _lastFree;
  1492. int size;
  1493. int items;
  1494. int maxItems;
  1495. MemoryPool(int size, int maxItems = 1024);
  1496. MemoryPool(const MemoryPool& other) = delete;
  1497. ~MemoryPool();
  1498. void* alloc();
  1499. void* alloc(int s) override;
  1500. void dealloc(void* obj) override;
  1501. MemoryPool& operator=(const MemoryPool& other) = delete;
  1502. };
  1503. //all mutexes are assumed to be recursive
  1504. class Mutex
  1505. {
  1506. public:
  1507. virtual void lock()=0;
  1508. virtual void unlock()=0;
  1509. };
  1510. class ScopeLock
  1511. {
  1512. public:
  1513. Mutex* mutex;
  1514. ScopeLock(Mutex& m) :
  1515. mutex(&m) {
  1516. m.lock();
  1517. }
  1518. ScopeLock(const ScopeLock& l) = delete;
  1519. void earlyUnlock() {
  1520. if (mutex != nullptr) mutex->unlock();
  1521. mutex = nullptr;
  1522. }
  1523. ~ScopeLock() {
  1524. if (mutex != nullptr) mutex->unlock();
  1525. }
  1526. ScopeLock& operator=(const ScopeLock& other) = delete;
  1527. };
  1528. class PThreadMutex: public Mutex
  1529. {
  1530. public:
  1531. pthread_mutex_t m;
  1532. PThreadMutex();
  1533. ~PThreadMutex();
  1534. void lock() override;
  1535. void unlock() override;
  1536. };
  1537. //reference counted buffer
  1538. class Buffer: public String
  1539. {
  1540. public:
  1541. typedef int refc_t;
  1542. refc_t* pbuf;
  1543. inline void __inc() {
  1544. ++*pbuf;
  1545. }
  1546. inline void __dec_autofree() {
  1547. if (--*pbuf <= 0) free(pbuf);
  1548. }
  1549. inline Buffer() :
  1550. pbuf(NULL) {
  1551. }
  1552. Buffer(refc_t* orig, void* buf, int length) :
  1553. String((char*) buf, length), pbuf(orig) {
  1554. __inc();
  1555. }
  1556. inline void release() {
  1557. if (pbuf != NULL) __dec_autofree();
  1558. pbuf = NULL;
  1559. d = NULL;
  1560. len = 0;
  1561. }
  1562. Buffer(int length) :
  1563. String((char*) NULL, length) {
  1564. if (length <= 0) {
  1565. this->pbuf = NULL;
  1566. return;
  1567. }
  1568. this->pbuf = (refc_t*) malloc(length + sizeof(refc_t));
  1569. if (this->pbuf == NULL) throw bad_alloc();
  1570. *this->pbuf = 1;
  1571. this->d = (char*) (this->pbuf + 1);
  1572. }
  1573. Buffer(const Buffer& b) :
  1574. String(b), pbuf(b.pbuf) {
  1575. if (b.pbuf != NULL) __inc();
  1576. }
  1577. Buffer& operator=(const Buffer& b) {
  1578. String::operator=(b);
  1579. if (pbuf != NULL) __dec_autofree();
  1580. if ((pbuf = b.pbuf) != NULL) __inc();
  1581. return *this;
  1582. }
  1583. inline ~Buffer() {
  1584. if (pbuf != NULL) __dec_autofree();
  1585. }
  1586. inline Buffer subBuffer(int index, int length) const {
  1587. if (index < 0 || index + length > this->len || length < 0) throw range_error(
  1588. "Buffer::subBuffer() out of range");
  1589. return Buffer(pbuf, d + index, length);
  1590. }
  1591. inline Buffer subBuffer(int index) const {
  1592. return subBuffer(index, len - index);
  1593. }
  1594. };
  1595. class BitArray
  1596. {
  1597. public:
  1598. uint32_t* data;
  1599. int length;
  1600. BitArray(int l) :
  1601. length(l) {
  1602. data = new uint32_t[(int) ceil(length / 32)];
  1603. }
  1604. ~BitArray() {
  1605. delete[] data;
  1606. }
  1607. inline bool get(int i) { //WARNING: no runtime check
  1608. __uint32_t tmp = data[i / 32];
  1609. return (tmp & ((int) 1 << (i % 32))) != 0;
  1610. }
  1611. inline void set(int i, bool b) {
  1612. int tmp1 = i / 32;
  1613. __uint32_t tmp = data[tmp1];
  1614. if (b) data[tmp1] = tmp | ((int) 1 << (i % 32));
  1615. else data[tmp1] = tmp & (~((int) 1 << (i % 32)));
  1616. }
  1617. };
  1618. template<class T> class CircularQueue: public RGC::Object
  1619. {
  1620. public:
  1621. T* array;
  1622. BitArray b;
  1623. int size;
  1624. int objsize;
  1625. int __wrap;
  1626. int s1, s2, e1, e2;
  1627. CircularQueue(int size, int objsize = 1) :
  1628. b(size), size(size), objsize(objsize), __wrap(size * 2), s1(0), s2(0), e1(0), e2(0) {
  1629. array = new T[size * objsize];
  1630. }
  1631. ~CircularQueue() {
  1632. delete[] array;
  1633. }
  1634. inline int __getlength(int i1, int i2, int wrap) {
  1635. return (i2 < i1 ? i2 + wrap : i2) - i1;
  1636. }
  1637. inline T& getPointer(int i) {
  1638. i %= size;
  1639. if (i >= size || i < 0) throw out_of_range("CircularQueue::getPointer() out of range");
  1640. return array[i * objsize]; //__intwrap(i,size);
  1641. }
  1642. inline bool canAppend() {
  1643. return __getlength(s1, e2, __wrap) < size;
  1644. }
  1645. int beginAppend() {
  1646. if (__getlength(s1, e2, __wrap) >= size) return -1;
  1647. int tmp = e2++;
  1648. e2 %= __wrap;
  1649. b.set(tmp % size, true);
  1650. return tmp;
  1651. }
  1652. void endAppend(int i) {
  1653. if (i == e1) {
  1654. do {
  1655. e1++;
  1656. e1 %= __wrap;
  1657. } while (__getlength(e1, e2, __wrap) > 0 && !(b.get(e1 % size)));
  1658. } else b.set(i % size, false);
  1659. }
  1660. inline int length() {
  1661. return __getlength(s2, e1, __wrap);
  1662. }
  1663. inline bool canDequeue() {
  1664. return __getlength(s2, e1, __wrap) > 0;
  1665. }
  1666. int beginDequeue() {
  1667. if (__getlength(s2, e1, __wrap) <= 0) return -1;
  1668. int tmp = s2++;
  1669. s2 %= __wrap;
  1670. b.set(tmp % size, true);
  1671. return tmp;
  1672. }
  1673. void endDequeue(int i) {
  1674. if (i == s1) {
  1675. do {
  1676. s1++;
  1677. s1 %= __wrap;
  1678. } while (__getlength(s1, s2, __wrap) > 0 && !(b.get(s1 % size)));
  1679. } else b.set(i % size, false);
  1680. }
  1681. };
  1682. }
  1683. static unsigned long sdbm(uint8_t* str, int len) {
  1684. unsigned long hash = 0;
  1685. int c;
  1686. for (int i = 0; i < len; i++) {
  1687. c = str[i];
  1688. hash = c + (hash << 6) + (hash << 16) - hash;
  1689. }
  1690. return hash;
  1691. }
  1692. namespace std
  1693. {
  1694. template<>
  1695. struct hash<CP::String>
  1696. {
  1697. size_t operator()(const ::CP::String& val) const {
  1698. return sdbm((uint8_t*) val.data(), val.length());
  1699. }
  1700. };
  1701. }
  1702. #endif