socketd_client.H 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. /*
  2. This program is free software: you can redistribute it and/or modify
  3. it under the terms of the GNU General Public License as published by
  4. the Free Software Foundation, either version 3 of the License, or
  5. (at your option) any later version.
  6. This program is distributed in the hope that it will be useful,
  7. but WITHOUT ANY WARRANTY; without even the implied warranty of
  8. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  9. GNU General Public License for more details.
  10. You should have received a copy of the GNU General Public License
  11. along with this program. If not, see <http://www.gnu.org/licenses/>.
  12. * */
  13. /*
  14. * socketd_client.H
  15. *
  16. * Created on: Mar 8, 2013
  17. * Author: xaxaxa
  18. */
  19. #ifndef SOCKETD_CLIENT_H_
  20. #define SOCKETD_CLIENT_H_
  21. #include <socketd.H>
  22. #include <cpoll/cpoll.H>
  23. #include <unistd.h>
  24. #include <cpoll/sendfd.H>
  25. #include <functional>
  26. #include <sstream>
  27. #define CONCAT(X) (((stringstream&)(stringstream()<<X)).str())
  28. namespace socketd
  29. {
  30. using namespace std;
  31. using namespace CP;
  32. class SocketProxy: public CP::Socket
  33. {
  34. public:
  35. uint8_t* buf;
  36. int bufPos, bufLen;
  37. SocketProxy(int fd, int d, int t, int p, int buflen) :
  38. CP::Socket(fd, d, t, p), bufPos(0), bufLen(buflen) {
  39. buf = new uint8_t[buflen];
  40. }
  41. int32_t tryFixRead(void* buf, int32_t len) {
  42. if (this->buf == NULL || bufPos >= bufLen) return -1;
  43. int32_t l = len > (bufLen - bufPos) ? (bufLen - bufPos) : len;
  44. if (l <= 0) return 0;
  45. memcpy(buf, this->buf + bufPos, l);
  46. bufPos += l;
  47. if (bufPos >= bufLen) {
  48. delete[] this->buf;
  49. this->buf = NULL;
  50. }
  51. return l;
  52. }
  53. int32_t read(void* buf, int32_t len) {
  54. int32_t r;
  55. if ((r = tryFixRead(buf, len)) == -1) return CP::Socket::read(buf, len);
  56. else return r;
  57. }
  58. int32_t recv(void* buf, int32_t len, int32_t flags = 0) {
  59. int32_t r;
  60. if ((r = tryFixRead(buf, len)) == -1) CP::Socket::recv(buf, len, flags);
  61. else {
  62. if (flags & MSG_WAITALL) {
  63. if (r < len) {
  64. int32_t tmp = CP::Socket::recv(((uint8_t*) buf) + r, len - r, flags);
  65. if (tmp > 0) r += tmp;
  66. }
  67. }
  68. return r;
  69. }
  70. }
  71. void read(void* buf, int32_t len, const Callback& cb, bool repeat = false) {
  72. int32_t r;
  73. if ((r = tryFixRead(buf, len)) == -1) CP::Socket::read(buf, len, cb, repeat);
  74. else {
  75. cb(r);
  76. if (repeat && r > 0) CP::Socket::read(buf, len, cb, true);
  77. }
  78. }
  79. void recv(void* buf, int32_t len, int32_t flags, const Callback& cb, bool repeat = false) {
  80. int32_t r;
  81. if ((r = tryFixRead(buf, len)) == -1) CP::Socket::recv(buf, len, flags, cb, repeat);
  82. else {
  83. //MSG_WAITALL is not supposed to be specified on an asynchoronous CP::Socket::recv() call
  84. /*
  85. if (flags & MSG_WAITALL) {
  86. if (r < len) {
  87. CP::Socket::recv(((uint8_t*) buf) + r, len - r, flags,
  88. [cb,r,repeat,buf,len,flags,this](int i)
  89. {
  90. int r1=r;
  91. if(i>0)r1+=i;
  92. cb(r1);
  93. if(repeat)CP::Socket::recv(buf, len, flags, cb, true);
  94. }, false);
  95. return;
  96. }
  97. }*/
  98. cb(r);
  99. if (repeat) CP::Socket::recv(buf, len, flags, cb, true);
  100. }
  101. }
  102. ~SocketProxy() {
  103. if (buf != NULL) delete[] buf;
  104. }
  105. };
  106. class socketd_client
  107. {
  108. public:
  109. CP::Poll& p;
  110. RGC::Ref<CP::Socket> sock;
  111. Delegate<void(socketd_client&, Socket*, int64_t id)> cb;
  112. protocolHeader ph;
  113. prot_handleConnection ph1;
  114. bool raw;
  115. /*vector<int> acks;
  116. uint8_t* tmp;
  117. int tmplen;
  118. bool writing;
  119. void startWrite() {
  120. if(writing || acks.size()<=0)return;
  121. int sz=sizeof(protocolHeader)+sizeof(prot_ackConnection);
  122. int sz1=sz*acks.size();
  123. if(tmplen<sz1) {
  124. if(tmp!=NULL)free(tmp);
  125. tmplen=sz1;
  126. tmp=(uint8_t*)malloc(sz1);
  127. }
  128. for(int i=0;i<acks.size();i++) {
  129. protocolHeader* ph=(protocolHeader*)(tmp+(sz*i));
  130. prot_ackConnection* ack=(prot_ackConnection*)(ph+1);
  131. ph->type=protocolHeader::ackConnection;
  132. ack->id=acks[i];
  133. ack->success=true;
  134. }
  135. acks.resize(0);
  136. writing=true;
  137. sock->write(tmp,sz1,[this](int r) {
  138. writing=false;
  139. if(r<=0)return;
  140. startWrite();
  141. });
  142. }*/
  143. void startRead();
  144. void ack(int64_t id) {
  145. if (raw) return;
  146. protocolHeader ph;
  147. memset(&ph, 0, sizeof(ph));
  148. ph.type = protocolHeader::ackConnection;
  149. prot_ackConnection ack;
  150. memset(&ack, 0, sizeof(ack));
  151. ack.id = id;
  152. ack.success = true;
  153. if (this->sock->send(&ph, sizeof(ph), MSG_DONTWAIT) != sizeof(ph)) throw runtime_error(
  154. "unix socket buffer overflow");
  155. if (this->sock->send(&ack, sizeof(ack), MSG_DONTWAIT) != sizeof(ack)) throw runtime_error(
  156. "unix socket buffer overflow");
  157. //acks.push_back(id);
  158. //startWrite();
  159. }
  160. void handleConnectionCB(int r) {
  161. if (r <= 0) {
  162. cb(*this, (Socket*) NULL, 0);
  163. return;
  164. }
  165. int fd = recvfd(sock->handle);
  166. if (fd < 0) {
  167. cb(*this, (Socket*) NULL, 0);
  168. return;
  169. }
  170. CP::Socket* newsock;
  171. //printf("asdfg %i\n",ph1.bufferLen);
  172. if (ph1.bufferLen <= 0) {
  173. newsock = new CP::Socket(fd, ph1.d, ph1.t, ph1.p);
  174. } else {
  175. SocketProxy* tmps;
  176. tmps = new SocketProxy(fd, ph1.d, ph1.t, ph1.p, ph1.bufferLen);
  177. int r = sock->recv(tmps->buf, ph1.bufferLen, MSG_WAITALL);
  178. if (r <= 0) {
  179. cb(*this, (Socket*) NULL, 0);
  180. return;
  181. }
  182. newsock = tmps;
  183. /*char junk[ph1.bufferLen];
  184. sock->recv(junk, ph1.bufferLen, MSG_WAITALL);
  185. newsock = new CP::Socket(fd, ph1.d, ph1.t, ph1.p);*/
  186. }
  187. p.add(*newsock);
  188. int64_t id = ph1.id;
  189. //printf("aaaaa %lli %i %i %i\n",ph1.id, ph1.d, ph1.t, ph1.p);
  190. cb(*this, newsock, id);
  191. newsock->release();
  192. startRead();
  193. }
  194. void readCB(int r) {
  195. //printf("readCB: this=%p\n",(void*)this);
  196. if (r <= 0) {
  197. cb(*this, (Socket*) NULL, 0);
  198. return;
  199. }
  200. if (r != sizeof(ph)) throw runtime_error(
  201. CONCAT("attempting to read protocolHeader resulted in short read: r=" << r) );
  202. switch (ph.type) {
  203. case protocolHeader::handleConnection:
  204. {
  205. sock->read(&ph1, sizeof(ph1),
  206. CP::Callback(&socketd_client::handleConnectionCB, this));
  207. return;
  208. }
  209. default:
  210. {
  211. throw runtime_error(CONCAT("unrecognized protocolHeader.type " << ph.type) );
  212. }
  213. }
  214. startRead();
  215. }
  216. socketd_client(CP::Poll& p, const Delegate<void(socketd_client&, Socket*, int64_t)>& cb,
  217. CP::Socket* sock = NULL) :
  218. p(p), cb(cb), raw(false)/*, tmp(NULL),tmplen(0),writing(false)*/{
  219. if (sock == NULL) {
  220. char* listen = getenv("SOCKETD_LISTEN");
  221. if (listen != NULL) {
  222. const char* aaa = (const char*) memchr(listen, ':', strlen(listen));
  223. if (aaa == NULL) throw runtime_error("expected \":\" in SOCKETD_LISTEN");
  224. int i = aaa - listen;
  225. sock = RGC::newObj<CP::Socket>();
  226. sock->bind(string(listen, i).c_str(),
  227. string(listen + i + 1, strlen(listen) - i - 1).c_str(), AF_UNSPEC,
  228. SOCK_STREAM);
  229. p.add(*sock);
  230. struct CB1
  231. {
  232. socketd_client* This;
  233. Delegate<void(socketd_client&, Socket*, int64_t)> cb;
  234. int64_t id;
  235. void operator()(Socket* s) {
  236. This->p.add(*s);
  237. cb(*This, s, (++id));
  238. s->release();
  239. }
  240. }*cb1 = new CB1 { this, cb, 0 };
  241. sock->listen();
  242. raw = true;
  243. sock->repeatAccept(cb1);
  244. return;
  245. }
  246. char* tmp;
  247. tmp = getenv("SOCKETD_FD");
  248. if (tmp == NULL) throw logic_error("environment \"SOCKETD_FD\" not set");
  249. sock = RGC::newObj<CP::Socket>(atoi(tmp), AF_UNIX, SOCK_STREAM, 0);
  250. p.add(*sock);
  251. }
  252. this->sock = sock;
  253. startRead();
  254. }
  255. };
  256. void socketd_client::startRead() {
  257. //memset(&ph, 0, sizeof(ph));
  258. //printf("startRead: this=%p\n",(void*)this);
  259. sock->recv(&ph, sizeof(ph), 0, CP::Callback(&socketd_client::readCB, this));
  260. }
  261. }
  262. #endif /* SOCKETD_CLIENT_H_ */