Browse Source

cppsp: added database update test

xaxaxa 12 years ago
parent
commit
d58da7c1e9
1 changed files with 180 additions and 0 deletions
  1. 180 0
      cpoll_cppsp/www/update

+ 180 - 0
cpoll_cppsp/www/update

@@ -0,0 +1,180 @@
+<%!-lmysqlclient%><%@ class mypage %><%#
+#include <mysql/mysql.h>
+#include <json/json.h>
+#include <cpoll/threadpool.H>
+#include <list>
+#include "connectioninfo.H"
+#include "generic_pool.H"
+#include "world.H"
+
+using namespace CP;
+using namespace cppsp;
+using namespace std;
+
+class myStatement
+{
+public:
+	MYSQL* db;
+	int rn;	//input param "id" for select statement
+	int r;	//output param "randomNumber" for select statement
+	long unsigned int len1;	//output length of "randomNumber" for select statement
+	int update_rn;	//input param "randomNumber" for update statement
+	int update_id;	//input param "id" for update statement
+	MYSQL_STMT* stmt;
+	MYSQL_STMT* stmt_update;
+	MYSQL_BIND param[1], results[1];
+	MYSQL_BIND param_update[2];
+	myStatement() {
+		db=doConnect(NULL);
+		
+		//select statement
+		stmt=mysql_stmt_init(db);
+		const char* sql="SELECT randomNumber FROM World WHERE id = ?";
+		if(mysql_stmt_prepare(stmt,sql,strlen(sql)))
+			throw runtime_error(mysql_stmt_error(stmt));
+		memset(param, 0, sizeof(param));
+		memset(results, 0, sizeof(results));
+		param[0].buffer_type = MYSQL_TYPE_LONG;
+		param[0].buffer = (char *)&rn;
+		param[0].buffer_length = sizeof(rn);
+		param[0].is_null = 0;
+		param[0].length = NULL;
+		results[0].buffer_type= MYSQL_TYPE_LONG;
+		results[0].buffer = &r;
+		results[0].buffer_length = sizeof(r);
+		results[0].is_null = 0;
+		results[0].length = &len1;
+		if(mysql_stmt_bind_param(stmt,param)) throw runtime_error(mysql_stmt_error(stmt));
+		if(mysql_stmt_bind_result(stmt, results)) throw runtime_error(mysql_stmt_error(stmt));
+		
+		//update statement
+		stmt_update=mysql_stmt_init(db);
+		sql="update World set randomNumber=? WHERE id = ?";
+		if(mysql_stmt_prepare(stmt_update,sql,strlen(sql)))
+			throw runtime_error(mysql_stmt_error(stmt_update));
+		memset(param_update, 0, sizeof(param_update));
+		param_update[0].buffer_type = MYSQL_TYPE_LONG;
+		param_update[0].buffer = (char *)&update_rn;
+		param_update[0].buffer_length = sizeof(update_rn);
+		param_update[0].is_null = 0;
+		param_update[0].length = NULL;
+		param_update[1].buffer_type = MYSQL_TYPE_LONG;
+		param_update[1].buffer = (char *)&update_id;
+		param_update[1].buffer_length = sizeof(update_id);
+		param_update[1].is_null = 0;
+		param_update[1].length = NULL;
+		if(mysql_stmt_bind_param(stmt_update,param_update))
+			throw runtime_error(mysql_stmt_error(stmt_update));
+	}
+	~myStatement() {
+		mysql_stmt_close(stmt);
+		mysql_stmt_close(stmt_update);
+		doDisconnect(NULL,db);
+	}
+};
+myStatement* cStatement(void*) {
+	return new myStatement();
+}
+void dStatement(void*, myStatement* s) {
+	delete s;
+}
+genericPool<myStatement*,128> stmtPool(&cStatement,&dStatement);
+
+ThreadPool tp(32);
+
+int curOperations=0;
+static const int maxOperations=MYSQL_MAX_CONNECTIONS;
+list<EventFD*> waitingThreads;
+
+void do_init_thread(void*) {
+	mysql_thread_init();
+}
+void do_deinit_thread(void*) {
+	mysql_thread_end();
+}
+extern "C" void initModule() {
+	mysql_library_init(0,NULL,NULL);
+	tp.afterStart=&do_init_thread;
+	tp.beforeExit=&do_deinit_thread;
+}
+
+%><%$
+EventFD efd{0,EFD_SEMAPHORE};
+int queries=1;
+world* items;
+myStatement* stmt;
+bool err=false;
+string errmsg;
+void tpFunc() {
+	//mysql_thread_init();
+	try {
+		for (int i=0;i<queries;i++){
+			items[i].id=stmt->rn=rand()%10000;	//id
+			if(mysql_stmt_execute(stmt->stmt)) throw runtime_error(mysql_stmt_error(stmt->stmt));
+			if(mysql_stmt_fetch(stmt->stmt)==0) {
+				while(mysql_stmt_fetch(stmt->stmt)==0);
+				//update to new random number
+				stmt->update_rn=rand()%10000; //new random number
+				stmt->update_id=stmt->rn; //id
+				if(mysql_stmt_execute(stmt->stmt_update))
+					throw runtime_error(mysql_stmt_error(stmt->stmt_update));
+				items[i].rnd=stmt->update_rn;
+			} else {
+				items[i].rnd=0;
+			}
+		}
+	}catch(exception& ex) {
+		err=true;
+		errmsg=ex.what();
+	}
+	efd.sendEvent(1);
+}
+//asynchronously load the data in the doInit() function, and defer page rendering until data is available
+void doInit() override {
+	if(unlikely(curOperations>=maxOperations)) {
+		//too many threads. need to wait.
+		waitingThreads.push_back(&efd);
+		efd.getEvent({&mypage::waitCB,this});
+		poll->add(efd);
+		return;
+	}
+	curOperations++;
+	auto it=request->queryString.find("queries");
+	if(it!=request->queryString.end()) {
+		queries=atoi((*it).second);
+	}
+	if(queries<1)queries=1;
+	if(queries>500)queries=500;
+	int i;
+	
+	items=(world*)sp->alloc(sizeof(world)*queries);
+	stmt=stmtPool.get();
+	poll->add(efd);
+	tp.invoke({&mypage::tpFunc,this});
+	efd.getEvent({&mypage::efdCB,this});
+}
+void efdCB(eventfd_t efdVal) {
+	curOperations--;
+	if(curOperations<maxOperations) {
+		if(unlikely(!waitingThreads.empty())) {
+			waitingThreads.front()->sendEvent(1);
+			waitingThreads.pop_front();
+		}
+	}
+	stmtPool.put(stmt);
+	Page::doInit();
+}
+void waitCB(eventfd_t efdVal) {
+	this->doInit();
+}
+
+%>[<%
+if(err) throw runtime_error(errmsg);
+for (int i=0;i<queries;i++){
+	if(i>0) output.write(',');
+	%>{"id":<%=items[i].id%>,"randomNumber":<%=items[i].rnd%>}<%
+}
+
+response->headers["Content-Type"]="application/json";
+response->headers["Server"]="cppsp/0.2";
+%>]