update 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. <%!-lmysqlclient%><%@ class mypage %><%#
  2. #include <mysql/mysql.h>
  3. #include <json/json.h>
  4. #include <cpoll/threadpool.H>
  5. #include <list>
  6. #include "connectioninfo.H"
  7. #include "generic_pool.H"
  8. #include "world.H"
  9. using namespace CP;
  10. using namespace cppsp;
  11. using namespace std;
  12. class myStatement
  13. {
  14. public:
  15. MYSQL* db;
  16. int rn; //input param "id" for select statement
  17. int r; //output param "randomNumber" for select statement
  18. long unsigned int len1; //output length of "randomNumber" for select statement
  19. int update_rn; //input param "randomNumber" for update statement
  20. int update_id; //input param "id" for update statement
  21. MYSQL_STMT* stmt;
  22. MYSQL_STMT* stmt_update;
  23. MYSQL_BIND param[1], results[1];
  24. MYSQL_BIND param_update[2];
  25. myStatement() {
  26. db=doConnect(NULL);
  27. //select statement
  28. stmt=mysql_stmt_init(db);
  29. const char* sql="SELECT randomNumber FROM World WHERE id = ?";
  30. if(mysql_stmt_prepare(stmt,sql,strlen(sql)))
  31. throw runtime_error(mysql_stmt_error(stmt));
  32. memset(param, 0, sizeof(param));
  33. memset(results, 0, sizeof(results));
  34. param[0].buffer_type = MYSQL_TYPE_LONG;
  35. param[0].buffer = (char *)&rn;
  36. param[0].buffer_length = sizeof(rn);
  37. param[0].is_null = 0;
  38. param[0].length = NULL;
  39. results[0].buffer_type= MYSQL_TYPE_LONG;
  40. results[0].buffer = &r;
  41. results[0].buffer_length = sizeof(r);
  42. results[0].is_null = 0;
  43. results[0].length = &len1;
  44. if(mysql_stmt_bind_param(stmt,param)) throw runtime_error(mysql_stmt_error(stmt));
  45. if(mysql_stmt_bind_result(stmt, results)) throw runtime_error(mysql_stmt_error(stmt));
  46. //update statement
  47. stmt_update=mysql_stmt_init(db);
  48. sql="update World set randomNumber=? WHERE id = ?";
  49. if(mysql_stmt_prepare(stmt_update,sql,strlen(sql)))
  50. throw runtime_error(mysql_stmt_error(stmt_update));
  51. memset(param_update, 0, sizeof(param_update));
  52. param_update[0].buffer_type = MYSQL_TYPE_LONG;
  53. param_update[0].buffer = (char *)&update_rn;
  54. param_update[0].buffer_length = sizeof(update_rn);
  55. param_update[0].is_null = 0;
  56. param_update[0].length = NULL;
  57. param_update[1].buffer_type = MYSQL_TYPE_LONG;
  58. param_update[1].buffer = (char *)&update_id;
  59. param_update[1].buffer_length = sizeof(update_id);
  60. param_update[1].is_null = 0;
  61. param_update[1].length = NULL;
  62. if(mysql_stmt_bind_param(stmt_update,param_update))
  63. throw runtime_error(mysql_stmt_error(stmt_update));
  64. }
  65. ~myStatement() {
  66. mysql_stmt_close(stmt);
  67. mysql_stmt_close(stmt_update);
  68. doDisconnect(NULL,db);
  69. }
  70. };
  71. myStatement* cStatement(void*) {
  72. return new myStatement();
  73. }
  74. void dStatement(void*, myStatement* s) {
  75. delete s;
  76. }
  77. genericPool<myStatement*,128> stmtPool(&cStatement,&dStatement);
  78. ThreadPool tp(32);
  79. int curOperations=0;
  80. static const int maxOperations=MYSQL_MAX_CONNECTIONS;
  81. list<EventFD*> waitingThreads;
  82. void do_init_thread(void*) {
  83. mysql_thread_init();
  84. }
  85. void do_deinit_thread(void*) {
  86. mysql_thread_end();
  87. }
  88. extern "C" void initModule() {
  89. mysql_library_init(0,NULL,NULL);
  90. tp.afterStart=&do_init_thread;
  91. tp.beforeExit=&do_deinit_thread;
  92. }
  93. %><%$
  94. EventFD efd{0,EFD_SEMAPHORE};
  95. int queries=1;
  96. world* items;
  97. myStatement* stmt;
  98. bool err=false;
  99. string errmsg;
  100. void tpFunc() {
  101. //mysql_thread_init();
  102. try {
  103. for (int i=0;i<queries;i++){
  104. items[i].id=stmt->rn=rand()%10000; //id
  105. if(mysql_stmt_execute(stmt->stmt)) throw runtime_error(mysql_stmt_error(stmt->stmt));
  106. if(mysql_stmt_fetch(stmt->stmt)==0) {
  107. while(mysql_stmt_fetch(stmt->stmt)==0);
  108. //update to new random number
  109. stmt->update_rn=rand()%10000; //new random number
  110. stmt->update_id=stmt->rn; //id
  111. if(mysql_stmt_execute(stmt->stmt_update))
  112. throw runtime_error(mysql_stmt_error(stmt->stmt_update));
  113. items[i].rnd=stmt->update_rn;
  114. } else {
  115. items[i].rnd=0;
  116. }
  117. }
  118. }catch(exception& ex) {
  119. err=true;
  120. errmsg=ex.what();
  121. }
  122. efd.sendEvent(1);
  123. }
  124. //asynchronously load the data in the doInit() function, and defer page rendering until data is available
  125. void doInit() override {
  126. if(unlikely(curOperations>=maxOperations)) {
  127. //too many threads. need to wait.
  128. waitingThreads.push_back(&efd);
  129. efd.getEvent({&mypage::waitCB,this});
  130. poll->add(efd);
  131. return;
  132. }
  133. curOperations++;
  134. auto it=request->queryString.find("queries");
  135. if(it!=request->queryString.end()) {
  136. queries=atoi((*it).second);
  137. }
  138. if(queries<1)queries=1;
  139. if(queries>500)queries=500;
  140. int i;
  141. items=(world*)sp->alloc(sizeof(world)*queries);
  142. stmt=stmtPool.get();
  143. poll->add(efd);
  144. tp.invoke({&mypage::tpFunc,this});
  145. efd.getEvent({&mypage::efdCB,this});
  146. }
  147. void efdCB(eventfd_t efdVal) {
  148. curOperations--;
  149. if(curOperations<maxOperations) {
  150. if(unlikely(!waitingThreads.empty())) {
  151. waitingThreads.front()->sendEvent(1);
  152. waitingThreads.pop_front();
  153. }
  154. }
  155. stmtPool.put(stmt);
  156. Page::doInit();
  157. }
  158. void waitCB(eventfd_t efdVal) {
  159. this->doInit();
  160. }
  161. %>[<%
  162. if(err) throw runtime_error(errmsg);
  163. for (int i=0;i<queries;i++){
  164. if(i>0) output.write(',');
  165. %>{"id":<%=items[i].id%>,"randomNumber":<%=items[i].rnd%>}<%
  166. }
  167. response->headers["Content-Type"]="application/json";
  168. response->headers["Server"]="cppsp/0.2";
  169. %>]