db_pg_async 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. <%!-lpq%><%!-I/usr/include/postgresql%><%@ class mypage %><%#
  2. #include <libpq-fe.h>
  3. #include <postgres.h>
  4. #include <catalog/pg_type.h>
  5. #include <json/json.h>
  6. #include <cpoll/threadpool.H>
  7. #include <list>
  8. #include "connectioninfo.H"
  9. #include "generic_pool.H"
  10. #include "world.H"
  11. using namespace CP;
  12. using namespace cppsp;
  13. using namespace std;
  14. struct myStatement
  15. {
  16. public:
  17. PGconn* db;
  18. int paramLengths[1];
  19. int paramFormats[1];
  20. File f;
  21. Poll* p;
  22. bool addedToPoll=false;
  23. myStatement() {
  24. db=doConnect_pg(NULL);
  25. Oid oid=INT4OID;
  26. PGresult* res;
  27. if((res=PQprepare(db, "zxcvb", "SELECT randomnumber FROM World where id=$1", 1, &oid))==NULL)
  28. goto fail;
  29. PQclear(res);
  30. paramLengths[0] = sizeof(int);
  31. paramFormats[0] = 1; //binary
  32. return;
  33. fail:
  34. doDisconnect_pg(NULL,db);
  35. throw runtime_error("error preparing statement");
  36. }
  37. ~myStatement() {
  38. if(addedToPoll) {
  39. p->del(f);
  40. }
  41. f.deinit();
  42. doDisconnect_pg(NULL,db);
  43. }
  44. void exec(int id) {
  45. const char *params[1];
  46. id=htonl(id);
  47. params[0]=(const char*)&id;
  48. PQsendQueryPrepared(db,"zxcvb",1,params,paramLengths,paramFormats,1/*binary*/);
  49. }
  50. };
  51. myStatement* cStatement(void*) {
  52. return new myStatement();
  53. }
  54. void dStatement(void*, myStatement* s) {
  55. delete s;
  56. }
  57. genericPool<myStatement*,128> stmtPool(&cStatement,&dStatement);
  58. %><%$
  59. int queries=1;
  60. world* items;
  61. int n=0;
  62. myStatement* stmt;
  63. //asynchronously load the data in the doInit() function, and defer page rendering until data is available
  64. void doInit() override {
  65. auto it=request->queryString.find("queries");
  66. if(it!=request->queryString.end()) {
  67. queries=atoi((*it).second);
  68. }
  69. if(queries<1)queries=1;
  70. if(queries>500)queries=500;
  71. int i;
  72. items=(world*)sp->alloc(sizeof(world)*queries);
  73. stmt=stmtPool.get();
  74. if(!stmt->addedToPoll) {
  75. poll->add(stmt->f);
  76. stmt->addedToPoll=true;
  77. stmt->p=poll;
  78. }
  79. beginGetItems();
  80. }
  81. void beginGetItems() {
  82. if(n>=queries) {
  83. stmtPool.put(stmt);
  84. Page::doInit();
  85. return;
  86. }
  87. items[n++].id=rand()%10000;
  88. stmt->exec(items[n-1].id);
  89. stmt->f.waitForEvent(Events::in,{&mypage::evtIn,this});
  90. }
  91. void evtIn(int) {
  92. PGresult* res;
  93. res=PQgetResult(stmt->db);
  94. PGresult* res1;
  95. while((res1=PQgetResult(stmt->db))!=NULL) {
  96. PQclear(res1);
  97. }
  98. int tmp;
  99. if(PQntuples(res)>0)
  100. tmp=*(const int*)PQgetvalue(res,0,0);
  101. else tmp=0;
  102. items[n-1].rnd=ntohl(tmp);
  103. PQclear(res);
  104. beginGetItems();
  105. }
  106. %>[<%
  107. for (int i=0;i<queries;i++){
  108. if(i>0) output.write(',');
  109. %>{"id":<%=items[i].id%>,"randomNumber":<%=items[i].rnd%>}<%
  110. }
  111. response->headers["Content-Type"]="application/json";
  112. response->headers["Server"]="cppsp/0.2";
  113. %>]