db_pg_threadpool 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. <%!-lpq%><%!-I/usr/include/postgresql%><%@ class mypage %><%#
  2. #include <libpq-fe.h>
  3. #include <postgres.h>
  4. #include <catalog/pg_type.h>
  5. #include <json/json.h>
  6. #include <cpoll/threadpool.H>
  7. #include <list>
  8. #include "connectioninfo.H"
  9. #include "generic_pool.H"
  10. #include "world.H"
  11. using namespace CP;
  12. using namespace cppsp;
  13. using namespace std;
  14. class myStatement
  15. {
  16. public:
  17. PGconn* db;
  18. int paramLengths[1];
  19. int paramFormats[1];
  20. myStatement() {
  21. db=doConnect_pg(NULL);
  22. Oid oid=INT4OID;
  23. PGresult* res;
  24. if((res=PQprepare(db, "zxcvb", "SELECT randomnumber FROM World where id=$1", 1, &oid))==NULL)
  25. goto fail;
  26. PQclear(res);
  27. paramLengths[0] = sizeof(int);
  28. paramFormats[0] = 1; //binary
  29. return;
  30. fail:
  31. doDisconnect_pg(NULL,db);
  32. throw runtime_error("error preparing statement");
  33. }
  34. ~myStatement() {
  35. doDisconnect_pg(NULL,db);
  36. }
  37. PGresult* exec(int id) {
  38. const char *params[1];
  39. id=htonl(id);
  40. params[0]=(const char*)&id;
  41. return PQexecPrepared(db,"zxcvb",1,params,paramLengths,paramFormats,1/*binary*/);
  42. }
  43. };
  44. myStatement* cStatement(void*) {
  45. return new myStatement();
  46. }
  47. void dStatement(void*, myStatement* s) {
  48. delete s;
  49. }
  50. genericPool<myStatement*,64> stmtPool(&cStatement,&dStatement);
  51. ThreadPool tp(32);
  52. %><%$
  53. EventFD efd{0,EFD_SEMAPHORE};
  54. int queries=1;
  55. world* items;
  56. myStatement* stmt;
  57. void tpFunc() {
  58. for (int i=0;i<queries;i++){
  59. PGresult* res;
  60. if((res=stmt->exec(items[i].id=(rand()%10000)))==NULL) throw bad_alloc();
  61. if(PQntuples(res)>0) {
  62. items[i].rnd=ntohl(*(const int*)PQgetvalue(res,0,0));
  63. }
  64. else items[i].rnd=0;
  65. PQclear(res);
  66. }
  67. efd.sendEvent(1);
  68. }
  69. //asynchronously load the data in the doInit() function, and defer page rendering until data is available
  70. void doInit() override {
  71. auto it=request->queryString.find("queries");
  72. if(it!=request->queryString.end()) {
  73. queries=atoi((*it).second);
  74. }
  75. if(queries<1)queries=1;
  76. if(queries>500)queries=500;
  77. items=(world*)sp->alloc(sizeof(world)*queries);
  78. stmt=stmtPool.get();
  79. poll->add(efd);
  80. tp.invoke({&mypage::tpFunc,this});
  81. efd.getEvent({&mypage::efdCB,this});
  82. }
  83. void efdCB(eventfd_t efdVal) {
  84. stmtPool.put(stmt);
  85. Page::doInit();
  86. }
  87. void waitCB(eventfd_t efdVal) {
  88. this->doInit();
  89. }
  90. %>[<%
  91. for (int i=0;i<queries;i++){
  92. if(i>0) output.write(',');
  93. %>{"id":<%=items[i].id%>,"randomNumber":<%=items[i].rnd%>}<%
  94. }
  95. response->headers["Content-Type"]="application/json";
  96. response->headers["Server"]="cppsp/0.2";
  97. %>]