123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- <%!-lmysqlclient%><%@ class mypage %><%#
- #include <mysql/mysql.h>
- #include <json/json.h>
- #include <cpoll/threadpool.H>
- #include <list>
- #include "connectioninfo.H"
- #include "generic_pool.H"
- #include "world.H"
- using namespace CP;
- using namespace cppsp;
- using namespace std;
- class myStatement
- {
- public:
- MYSQL* db;
- int rn; //input param "id" for select statement
- int r; //output param "randomNumber" for select statement
- long unsigned int len1; //output length of "randomNumber" for select statement
- int update_rn; //input param "randomNumber" for update statement
- int update_id; //input param "id" for update statement
- MYSQL_STMT* stmt;
- MYSQL_STMT* stmt_update;
- MYSQL_BIND param[1], results[1];
- MYSQL_BIND param_update[2];
- myStatement() {
- db=doConnect(NULL);
-
- //select statement
- stmt=mysql_stmt_init(db);
- const char* sql="SELECT randomNumber FROM World WHERE id = ?";
- if(mysql_stmt_prepare(stmt,sql,strlen(sql)))
- throw runtime_error(mysql_stmt_error(stmt));
- memset(param, 0, sizeof(param));
- memset(results, 0, sizeof(results));
- param[0].buffer_type = MYSQL_TYPE_LONG;
- param[0].buffer = (char *)&rn;
- param[0].buffer_length = sizeof(rn);
- param[0].is_null = 0;
- param[0].length = NULL;
- results[0].buffer_type= MYSQL_TYPE_LONG;
- results[0].buffer = &r;
- results[0].buffer_length = sizeof(r);
- results[0].is_null = 0;
- results[0].length = &len1;
- if(mysql_stmt_bind_param(stmt,param)) throw runtime_error(mysql_stmt_error(stmt));
- if(mysql_stmt_bind_result(stmt, results)) throw runtime_error(mysql_stmt_error(stmt));
-
- //update statement
- stmt_update=mysql_stmt_init(db);
- sql="update World set randomNumber=? WHERE id = ?";
- if(mysql_stmt_prepare(stmt_update,sql,strlen(sql)))
- throw runtime_error(mysql_stmt_error(stmt_update));
- memset(param_update, 0, sizeof(param_update));
- param_update[0].buffer_type = MYSQL_TYPE_LONG;
- param_update[0].buffer = (char *)&update_rn;
- param_update[0].buffer_length = sizeof(update_rn);
- param_update[0].is_null = 0;
- param_update[0].length = NULL;
- param_update[1].buffer_type = MYSQL_TYPE_LONG;
- param_update[1].buffer = (char *)&update_id;
- param_update[1].buffer_length = sizeof(update_id);
- param_update[1].is_null = 0;
- param_update[1].length = NULL;
- if(mysql_stmt_bind_param(stmt_update,param_update))
- throw runtime_error(mysql_stmt_error(stmt_update));
- }
- ~myStatement() {
- mysql_stmt_close(stmt);
- mysql_stmt_close(stmt_update);
- doDisconnect(NULL,db);
- }
- };
- myStatement* cStatement(void*) {
- return new myStatement();
- }
- void dStatement(void*, myStatement* s) {
- delete s;
- }
- genericPool<myStatement*,128> stmtPool(&cStatement,&dStatement);
- ThreadPool tp(32);
- int curOperations=0;
- static const int maxOperations=MYSQL_MAX_CONNECTIONS;
- list<EventFD*> waitingThreads;
- void do_init_thread(void*) {
- mysql_thread_init();
- }
- void do_deinit_thread(void*) {
- mysql_thread_end();
- }
- extern "C" void initModule() {
- mysql_library_init(0,NULL,NULL);
- tp.afterStart=&do_init_thread;
- tp.beforeExit=&do_deinit_thread;
- }
- %><%$
- EventFD efd{0,EFD_SEMAPHORE};
- int queries=1;
- world* items;
- myStatement* stmt;
- bool err=false;
- string errmsg;
- void tpFunc() {
- //mysql_thread_init();
- try {
- for (int i=0;i<queries;i++){
- items[i].id=stmt->rn=rand()%10000; //id
- if(mysql_stmt_execute(stmt->stmt)) throw runtime_error(mysql_stmt_error(stmt->stmt));
- if(mysql_stmt_fetch(stmt->stmt)==0) {
- while(mysql_stmt_fetch(stmt->stmt)==0);
- //update to new random number
- stmt->update_rn=rand()%10000; //new random number
- stmt->update_id=stmt->rn; //id
- if(mysql_stmt_execute(stmt->stmt_update))
- throw runtime_error(mysql_stmt_error(stmt->stmt_update));
- items[i].rnd=stmt->update_rn;
- } else {
- items[i].rnd=0;
- }
- }
- }catch(exception& ex) {
- err=true;
- errmsg=ex.what();
- }
- efd.sendEvent(1);
- }
- //asynchronously load the data in the doInit() function, and defer page rendering until data is available
- void doInit() override {
- if(unlikely(curOperations>=maxOperations)) {
- //too many threads. need to wait.
- waitingThreads.push_back(&efd);
- efd.getEvent({&mypage::waitCB,this});
- poll->add(efd);
- return;
- }
- curOperations++;
- auto it=request->queryString.find("queries");
- if(it!=request->queryString.end()) {
- queries=atoi((*it).second);
- }
- if(queries<1)queries=1;
- if(queries>500)queries=500;
- int i;
-
- items=(world*)sp->alloc(sizeof(world)*queries);
- stmt=stmtPool.get();
- poll->add(efd);
- tp.invoke({&mypage::tpFunc,this});
- efd.getEvent({&mypage::efdCB,this});
- }
- void efdCB(eventfd_t efdVal) {
- curOperations--;
- if(curOperations<maxOperations) {
- if(unlikely(!waitingThreads.empty())) {
- waitingThreads.front()->sendEvent(1);
- waitingThreads.pop_front();
- }
- }
- stmtPool.put(stmt);
- Page::doInit();
- }
- void waitCB(eventfd_t efdVal) {
- this->doInit();
- }
- %>[<%
- if(err) throw runtime_error(errmsg);
- for (int i=0;i<queries;i++){
- if(i>0) output.write(',');
- %>{"id":<%=items[i].id%>,"randomNumber":<%=items[i].rnd%>}<%
- }
- response->headers["Content-Type"]="application/json";
- response->headers["Server"]="cppsp/0.2";
- %>]
|