http.C 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. /***************************************************************************
  2. *
  3. * copyright notice: some code is copied from cURL, some code is my own
  4. * _ _ ____ _
  5. * Project ___| | | | _ \| |
  6. * / __| | | | |_) | |
  7. * | (__| |_| | _ <| |___
  8. * \___|\___/|_| \_\_____|
  9. *
  10. * Copyright (C) 1998 - 2011, Daniel Stenberg, <[email protected]>, et al.
  11. *
  12. * This software is licensed as described in the file COPYING, which
  13. * you should have received as part of this distribution. The terms
  14. * are also available at http://curl.haxx.se/docs/copyright.html.
  15. *
  16. * You may opt to use, copy, modify, merge, publish, distribute and/or sell
  17. * copies of the Software, and permit persons to whom the Software is
  18. * furnished to do so, under the terms of the COPYING file.
  19. *
  20. * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
  21. * KIND, either express or implied.
  22. *
  23. * //modified by: xaxaxa
  24. *
  25. ***************************************************************************/
  26. #include "include/http.H"
  27. #include <fcntl.h>
  28. #include <sys/stat.h>
  29. #include <errno.h>
  30. #include <iostream>
  31. #include <stdio.h>
  32. #include <string.h>
  33. #include <stdlib.h>
  34. #include <sys/time.h>
  35. #include <time.h>
  36. #include <unistd.h>
  37. #include <sys/poll.h>
  38. using namespace std;
  39. using namespace CP;
  40. namespace curl
  41. {
  42. struct taskInfo;
  43. void processEvent(taskInfo* t, int i);
  44. struct taskInfo: public CP::File
  45. {
  46. instance* inst;
  47. CURL* c;
  48. int act; //from libcurl
  49. bool isAdded;
  50. taskInfo(HANDLE h) :
  51. File(h), act(0), isAdded(false) {
  52. }
  53. void inCB(int i) {
  54. processEvent(this, CURL_CSELECT_IN);
  55. }
  56. void outCB(int i) {
  57. processEvent(this, CURL_CSELECT_OUT);
  58. }
  59. void updateEvents(int act) {
  60. if (act & CURL_POLL_IN) waitForEvent(CP::Events::in, { &taskInfo::inCB, this }, true);
  61. else if (this->act & CURL_POLL_IN) cancelRead();
  62. if (act & CURL_POLL_OUT) waitForEvent(CP::Events::out, { &taskInfo::outCB, this }, true);
  63. else if (this->act & CURL_POLL_OUT) cancelWrite();
  64. this->act = act;
  65. }
  66. void setTaskInfo(instance* inst, CURL* c, int act) {
  67. this->inst = inst;
  68. this->c = c;
  69. updateEvents(act);
  70. inst->p->add(*this);
  71. this->isAdded = true;
  72. }
  73. };
  74. struct curlTaskInfo
  75. {
  76. CURL *c;
  77. function<void(CURL*, CURLcode)> cb;
  78. //void* userdata;
  79. };
  80. void checkQueue(instance* inst) {
  81. CURLMsg *msg;
  82. int msgs_left;
  83. while ((msg = curl_multi_info_read(inst->m, &msgs_left))) {
  84. if (msg->msg == CURLMSG_DONE) {
  85. CURL *c = msg->easy_handle;
  86. curlTaskInfo* t = NULL;
  87. curl_easy_getinfo(c, CURLINFO_PRIVATE, &t);
  88. //cout << "t = " << t << endl;
  89. curl_multi_remove_handle(inst->m, c);
  90. curl_easy_cleanup(c);
  91. t->cb(c, msg->data.result);
  92. delete t;
  93. }
  94. }
  95. }
  96. void processEvent(taskInfo* t, int i) {
  97. //printf("processEvent: i=%i\n", i);
  98. int num_transfers;
  99. curl_multi_socket_action(t->inst->m, t->handle, i, &num_transfers);
  100. checkQueue(t->inst);
  101. }
  102. /* CURLMOPT_SOCKETFUNCTION */
  103. int cb_sock(CURL *c, curl_socket_t s, int what, void *cbp, void *sockp) {
  104. //cout << "cb_sock()" << endl;
  105. instance* inst = (instance*) cbp;
  106. taskInfo* t = (taskInfo*) sockp;
  107. if (what == CURL_POLL_REMOVE && t != NULL) {
  108. //printf("cb_sock: remove\n");
  109. //if (t->isAdded) event_del(&t->ev);
  110. inst->p->del(*t);
  111. t->handle = -1;
  112. delete t;
  113. } else {
  114. if (t == NULL) { //add
  115. //printf("cb_sock: add\n");
  116. t = new taskInfo((HANDLE) s);
  117. t->setTaskInfo(inst, c, what);
  118. curl_multi_assign(inst->m, s, t);
  119. } else { //modify events monitored
  120. //printf("cb_sock: update\n");
  121. t->updateEvents(what);
  122. }
  123. }
  124. return 0;
  125. }
  126. void addCurlTask(instance* inst, CURL* c, const function<void(CURL*, CURLcode)>& cb) {
  127. //printf("addCurlTask\n");
  128. curlTaskInfo* t = new curlTaskInfo();
  129. t->c = c;
  130. t->cb = cb;
  131. //t->userdata=userdata;
  132. curl_easy_setopt(c, CURLOPT_PRIVATE, t);
  133. curl_multi_add_handle(inst->m, c);
  134. //int still_running;
  135. //curl_multi_perform(inst->m, &still_running);
  136. }
  137. struct transferInfo
  138. {
  139. function<bool(const void* data, int len, int state)> cb;
  140. CURL* c;
  141. };
  142. size_t cb_data(void *data, size_t size, size_t nmemb, void *userdata) {
  143. transferInfo* t = (transferInfo*) userdata;
  144. if (!t->cb(data, size * nmemb, 3)) return 0;
  145. return size * nmemb;
  146. }
  147. transferInfo* addTransfer(instance* inst, const char* url,
  148. const function<bool(const void* data, int len, int state)>& cb)
  149. /*-1:failed 1:connected 2:sent 3:recving 4:closed*/
  150. {
  151. CURL* c = curl_easy_init();
  152. curl_easy_setopt(c, CURLOPT_URL, url);
  153. transferInfo* t = new transferInfo();
  154. t->cb = cb;
  155. t->c = c;
  156. curl_easy_setopt(c, CURLOPT_WRITEDATA, t);
  157. curl_easy_setopt(c, CURLOPT_WRITEFUNCTION, cb_data);
  158. return t;
  159. }
  160. void beginTransfer(instance* inst, transferInfo* t) {
  161. addCurlTask(inst, t->c, [t](CURL* c,CURLcode res)
  162. {
  163. if(res==CURLE_OK) t->cb(NULL,0,4);
  164. else if(res!=CURLE_WRITE_ERROR) t->cb(NULL,0,-res);
  165. delete t;
  166. });
  167. }
  168. int cb_curl_timer(CURLM *m, long timeout_ms, void* userdata) { /* Update the event timer after curl_multi library calls */
  169. //printf("cb_curl_timer: timeout=%li\n", timeout_ms);
  170. instance* inst = (instance*) userdata;
  171. inst->timer.setInterval(timeout_ms < 0 ? 0 : timeout_ms);
  172. return 0;
  173. }
  174. void cb_timer(void *userdata, int count) {
  175. //printf("cb_timer: count=%i\n", count);
  176. instance* inst = (instance*) userdata;
  177. inst->timer.setInterval(0);
  178. int num_transfers;
  179. curl_multi_socket_action(inst->m, CURL_SOCKET_TIMEOUT, 0, &num_transfers);
  180. checkQueue(inst);
  181. }
  182. void newInstance(instance* inst, CP::Poll* p) {
  183. inst->m = curl_multi_init();
  184. inst->p = p;
  185. inst->timer.setCallback( { &cb_timer, (void*) inst });
  186. p->add(inst->timer);
  187. //event_assign(&inst->timer_event, inst->eb, -1, 0, cb_timer, inst);
  188. curl_multi_setopt(inst->m, CURLMOPT_SOCKETFUNCTION, cb_sock);
  189. curl_multi_setopt(inst->m, CURLMOPT_SOCKETDATA, inst);
  190. curl_multi_setopt(inst->m, CURLMOPT_TIMERFUNCTION, cb_curl_timer);
  191. curl_multi_setopt(inst->m, CURLMOPT_TIMERDATA, inst);
  192. }
  193. void dispose(instance* inst) {
  194. curl_multi_cleanup(inst->m);
  195. }
  196. void transferManager::addTransfer(const char* url, bool post,
  197. const function<bool(const void* data, int len, int state)>& cb) {
  198. if (itemsProcessing < concurrency) {
  199. doTransfer(url, post, cb);
  200. } else {
  201. q.push( { url, post, cb });
  202. }
  203. }
  204. void transferManager::checkQueue() {
  205. if (itemsProcessing < concurrency && q.size() > 0) {
  206. item& it = q.front();
  207. doTransfer(it.url.c_str(), it.post, it.cb);
  208. q.pop();
  209. }
  210. }
  211. void transferManager::doTransfer(const char* url, bool post,
  212. const function<bool(const void* data, int len, int state)>& cb) {
  213. itemsProcessing++;
  214. transferInfo* t = curl::addTransfer(&inst, url,
  215. [cb,this](const void* data, int len, int state)
  216. {
  217. if(state==4) {
  218. itemsProcessing--;
  219. checkQueue();
  220. }
  221. return cb(data, len, state);
  222. });
  223. if (post) curl_easy_setopt(t->c, CURLOPT_POST, 1);
  224. curl::beginTransfer(&inst, t);
  225. }
  226. }
  227. /*
  228. int main(int argc, char **argv)
  229. {
  230. curl::instance inst;
  231. curl::newInstance(&inst);
  232. curl::addTransfer(&inst,"http://192.168.5.11/",[](const void* data, int len, int state)
  233. {
  234. cout << len << endl;
  235. //if(data!=NULL && len>0)
  236. // write(1,data,len);
  237. return true;
  238. });
  239. curl::eventLoop(&inst);
  240. return 0;
  241. } //*/