|
@@ -19,6 +19,7 @@
|
|
|
* Created on: 03-Feb-2020
|
|
|
* Author: sumeetc
|
|
|
*/
|
|
|
+
|
|
|
#include "TeBkUmLpqAsync.h"
|
|
|
|
|
|
int TeBkUmLpqAsyncWorld::getId() const {
|
|
@@ -109,525 +110,528 @@ void TeBkUmLpqAsyncMessage::setMessage(const std::string& message) {
|
|
|
}
|
|
|
|
|
|
const std::string TeBkUmLpqAsyncRouter::HELLO_WORLD = "Hello, World!";
|
|
|
-std::string TeBkUmLpqAsyncRouter::WORLD = "world";
|
|
|
-std::string TeBkUmLpqAsyncRouter::WORLD_ONE_QUERY = "select id, randomnumber from world where id = $1";
|
|
|
-std::string TeBkUmLpqAsyncRouter::WORLD_ALL_QUERY = "select id, randomnumber from world";
|
|
|
-std::string TeBkUmLpqAsyncRouter::FORTUNE_ALL_QUERY = "select id, message from fortune";
|
|
|
-std::map<int, std::string> TeBkUmLpqAsyncRouter::_qC;
|
|
|
-
|
|
|
-void TeBkUmLpqAsyncRouter::dbAsync(AsyncReq* req) {
|
|
|
- LibpqDataSourceImpl* sqli = getDb();
|
|
|
- int rid = rand() % 10000 + 1;
|
|
|
- try {
|
|
|
- std::vector<LibpqParam> pars;
|
|
|
- LibpqDataSourceImpl::ADD_INT4(pars, rid);
|
|
|
- void* areq = sqli->executeQueryAsync(WORLD_ONE_QUERY, std::move(pars), req, &TeBkUmLpqAsyncRouter::dbAsyncUtil, &TeBkUmLpqAsyncRouter::dbAsyncCh, NULL);
|
|
|
- sqli->completeAsync(areq);
|
|
|
- } catch(const std::exception& e) {
|
|
|
- throw e;
|
|
|
- }
|
|
|
-}
|
|
|
-void TeBkUmLpqAsyncRouter::dbAsyncUtil(void* ctx, int rn, int cn, char * d) {
|
|
|
- AsyncReq* req = (AsyncReq*)ctx;
|
|
|
- if(cn==0)req->w.setId(ntohl(*((uint32_t *) d)));
|
|
|
- if(cn==1)req->w.setRandomNumber(ntohl(*((uint32_t *) d)));
|
|
|
-}
|
|
|
-void TeBkUmLpqAsyncRouter::dbAsyncCh(void* ctx, bool status, const std::string& q, int counter) {
|
|
|
- AsyncReq* req = (AsyncReq*)ctx;
|
|
|
- HttpResponse r;
|
|
|
- r.setHTTPResponseStatus(HTTPResponseStatus::Ok);
|
|
|
- JSONSerialize::serializeObject(&req->w, w_ser, r.getContentP());
|
|
|
- std::string d;
|
|
|
- r.generateHeadResponse(d, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, req->httpVers, req->conn_clos);
|
|
|
- req->sif->writeDirect(d);
|
|
|
- req->sif->writeDirect(r.getContent());
|
|
|
- req->sif->unUse();
|
|
|
- delete req;
|
|
|
+const std::string TeBkUmLpqAsyncRouter::WORLD = "world";
|
|
|
+const std::string TeBkUmLpqAsyncRouter::WORLD_ONE_QUERY = "select id,randomnumber from world where id=$1";
|
|
|
+const std::string TeBkUmLpqAsyncRouter::WORLD_ALL_QUERY = "select id,randomnumber from world";
|
|
|
+const std::string TeBkUmLpqAsyncRouter::FORTUNE_ALL_QUERY = "select id,message from fortune";
|
|
|
+std::unordered_map<int, std::string> TeBkUmLpqAsyncRouter::_qC;
|
|
|
+int TeBkUmLpqAsyncRouter::g_seed = 0;
|
|
|
+
|
|
|
+void TeBkUmLpqAsyncRouter::dbAsync(SocketInterface* sif) {
|
|
|
+ LibpqDataSourceImpl* sqli = getDb(5);
|
|
|
+ int rid = CommonUtils::fastrand(g_seed) % 10000 + 1;
|
|
|
+ LibpqAsyncReq* areq = sqli->getAsyncRequest();
|
|
|
+ LibpqQuery* q = areq->getQuery();
|
|
|
+ q->withParamInt4(rid);
|
|
|
+ q->withSelectQuery(WORLD_ONE_QUERY).withContext(sif).withCb0([](void* ctx, PGresult* res) {
|
|
|
+ SocketInterface* sif = (SocketInterface*)ctx;
|
|
|
+
|
|
|
+ TeBkUmLpqAsyncWorld w;
|
|
|
+ int cols = PQnfields(res);
|
|
|
+ for (int j = 0; j < cols; ++j) {
|
|
|
+ if(j==0)w.setId(ntohl(*((uint32_t *) PQgetvalue(res, 0, j))));
|
|
|
+ else w.setRandomNumber(ntohl(*((uint32_t *) PQgetvalue(res, 0, j))));
|
|
|
+ }
|
|
|
+
|
|
|
+ HttpResponse r;
|
|
|
+ JSONSerialize::serializeObject(&w, w_ser, r.getContentP());
|
|
|
+ std::string h;
|
|
|
+ r.httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, 1.1, false);
|
|
|
+ sif->writeDirect(h, r.getContent());
|
|
|
+ sif->unUse();
|
|
|
+ });
|
|
|
+ sqli->postAsync(areq);
|
|
|
}
|
|
|
|
|
|
+void TeBkUmLpqAsyncRouter::queriesAsync(const char* q, int ql, SocketInterface* sif) {
|
|
|
+ int queryCount = 0;
|
|
|
+ CommonUtils::fastStrToNum(q, ql, queryCount);
|
|
|
+ queryCount = std::max(1, std::min(queryCount, 500));
|
|
|
+
|
|
|
+ LibpqDataSourceImpl* sqli = getDb(3);
|
|
|
+ LibpqAsyncReq* areq = sqli->getAsyncRequest();
|
|
|
+ for (int c = 0; c < queryCount; ++c) {
|
|
|
+ int rid = CommonUtils::fastrand(g_seed) % 10000 + 1;
|
|
|
+ LibpqQuery* q = areq->getQuery();
|
|
|
+ q->withParamInt4(rid);
|
|
|
+ q->withSelectQuery(WORLD_ONE_QUERY);
|
|
|
+ }
|
|
|
+ areq->withFinalCb(sif, [](void* ctx, bool status, std::vector<PGresult*>* results, const std::string& q, int counter) {
|
|
|
+ SocketInterface* sif = (SocketInterface*)ctx;
|
|
|
+ std::vector<TeBkUmLpqAsyncWorld> vec;
|
|
|
+ vec.reserve((int)results->size());
|
|
|
+ for (int i = 0; i < (int)results->size(); ++i) {
|
|
|
+ PGresult* res = results->at(i);
|
|
|
+ int cols = PQnfields(res);
|
|
|
+ for (int j = 0; j < cols; ++j) {
|
|
|
+ if(j==0) vec.emplace_back(ntohl(*((uint32_t *) PQgetvalue(res, 0, j))));
|
|
|
+ else vec.back().setRandomNumber(ntohl(*((uint32_t *) PQgetvalue(res, 0, j))));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ HttpResponse r;
|
|
|
+ JSONSerialize::serializeObjectCont(&vec, wcont_ser, "vector", r.getContentP());
|
|
|
+ std::string h;
|
|
|
+ r.httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, 1.1, false);
|
|
|
+ sif->writeDirect(h, r.getContent());
|
|
|
+ sif->unUse();
|
|
|
+ });
|
|
|
+ sqli->postAsync(areq);
|
|
|
+}
|
|
|
|
|
|
-void TeBkUmLpqAsyncRouter::queriesAsync(const char* q, int ql, AsyncReq* req) {
|
|
|
+void TeBkUmLpqAsyncRouter::queriesMultiAsync(const char* q, int ql, SocketInterface* sif) {
|
|
|
int queryCount = 0;
|
|
|
- strToNum(q, ql, queryCount);
|
|
|
- if(queryCount<1)queryCount=1;
|
|
|
- else if(queryCount>500)queryCount=500;
|
|
|
+ CommonUtils::fastStrToNum(q, ql, queryCount);
|
|
|
+ queryCount = std::max(1, std::min(queryCount, 500));
|
|
|
|
|
|
- req->vec.reserve(queryCount);
|
|
|
+ LibpqDataSourceImpl* sqli = getDb(3);
|
|
|
|
|
|
- LibpqDataSourceImpl* sqli = getDb();
|
|
|
+ std::stringstream ss;
|
|
|
+ for (int c = 0; c < queryCount; ++c) {
|
|
|
+ int rid = CommonUtils::fastrand(g_seed) % 10000 + 1;
|
|
|
+ ss << "select id, randomnumber from world where id = " << rid << ";";
|
|
|
+ }
|
|
|
|
|
|
- try {
|
|
|
- void* areq = NULL;
|
|
|
- for (int c = 0; c < queryCount; ++c) {
|
|
|
- int rid = rand() % 10000 + 1;
|
|
|
- std::vector<LibpqParam> pars;
|
|
|
- LibpqDataSourceImpl::ADD_INT4(pars, rid);
|
|
|
- areq = sqli->executeQueryAsync(WORLD_ONE_QUERY, std::move(pars), req, &TeBkUmLpqAsyncRouter::queriesAsyncUtil, &TeBkUmLpqAsyncRouter::queriesAsyncCh, areq);
|
|
|
+ LibpqAsyncReq* areq = sqli->getAsyncRequest();
|
|
|
+ LibpqQuery* qu = areq->getQuery();
|
|
|
+ qu->withSelectQuery(ss.str()).withMulti();
|
|
|
+
|
|
|
+ areq->withFinalCb(sif, [](void* ctx, bool status, std::vector<PGresult*>* results, const std::string& q, int counter) {
|
|
|
+ SocketInterface* sif = (SocketInterface*)ctx;
|
|
|
+ std::vector<TeBkUmLpqAsyncWorld> vec;
|
|
|
+ vec.reserve((int)results->size());
|
|
|
+ for (int i = 0; i < (int)results->size(); ++i) {
|
|
|
+ PGresult* res = results->at(i);
|
|
|
+ int cols = PQnfields(res);
|
|
|
+ for (int j = 0; j < cols; ++j) {
|
|
|
+ int tmp = 0;
|
|
|
+ CommonUtils::fastStrToNum(PQgetvalue(res, 0, j), PQgetlength(res, 0, j), tmp);
|
|
|
+ if(j==0) vec.emplace_back(tmp);
|
|
|
+ else vec.back().setRandomNumber(tmp);
|
|
|
+ }
|
|
|
}
|
|
|
- sqli->completeAsync(areq);
|
|
|
- } catch(const std::exception& e) {
|
|
|
- throw e;
|
|
|
- }
|
|
|
-}
|
|
|
-void TeBkUmLpqAsyncRouter::queriesAsyncUtil(void* ctx, int rn, int cn, char * d) {
|
|
|
- AsyncReq* req = (AsyncReq*)ctx;
|
|
|
- if(cn==0) {
|
|
|
- req->vec.emplace_back(ntohl(*((uint32_t *) d)));
|
|
|
- } else {
|
|
|
- req->vec.back().setRandomNumber(ntohl(*((uint32_t *) d)));
|
|
|
- }
|
|
|
-}
|
|
|
-void TeBkUmLpqAsyncRouter::queriesAsyncCh(void* ctx, bool status, const std::string& q, int counter) {
|
|
|
- AsyncReq* req = (AsyncReq*)ctx;
|
|
|
- HttpResponse r;
|
|
|
- r.setHTTPResponseStatus(HTTPResponseStatus::Ok);
|
|
|
- JSONSerialize::serializeObjectCont(&req->vec, wcont_ser, "vector", r.getContentP());
|
|
|
- std::string d;
|
|
|
- r.generateHeadResponse(d, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, req->httpVers, req->conn_clos);
|
|
|
- req->sif->writeDirect(d);
|
|
|
- req->sif->writeDirect(r.getContent());
|
|
|
- req->sif->unUse();
|
|
|
- delete req;
|
|
|
-}
|
|
|
|
|
|
+ HttpResponse r;
|
|
|
+ JSONSerialize::serializeObjectCont(&vec, wcont_ser, "vector", r.getContentP());
|
|
|
+ std::string h;
|
|
|
+ r.httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, 1.1, false);
|
|
|
+ sif->writeDirect(h, r.getContent());
|
|
|
+ sif->unUse();
|
|
|
+ });
|
|
|
+ sqli->postAsync(areq, queryCount);
|
|
|
+}
|
|
|
|
|
|
-#ifndef HAVE_LIBPQ_BATCH
|
|
|
-void TeBkUmLpqAsyncRouter::queriesMultiAsync(const char* q, int ql, AsyncReq* req) {
|
|
|
+void TeBkUmLpqAsyncRouter::updatesMulti(const char* q, int ql, AsyncUpdatesReq* req) {
|
|
|
int queryCount = 0;
|
|
|
- strToNum(q, ql, queryCount);
|
|
|
- if(queryCount<1)queryCount=1;
|
|
|
- else if(queryCount>500)queryCount=500;
|
|
|
+ CommonUtils::fastStrToNum(q, ql, queryCount);
|
|
|
+ queryCount = std::max(1, std::min(queryCount, 500));
|
|
|
|
|
|
req->vec.reserve(queryCount);
|
|
|
+ req->sqli = getDb(3);
|
|
|
|
|
|
- LibpqDataSourceImpl* sqli = getDb();
|
|
|
-
|
|
|
- try {
|
|
|
- std::stringstream ss;
|
|
|
- for (int c = 0; c < queryCount; ++c) {
|
|
|
- int rid = rand() % 10000 + 1;
|
|
|
- ss << "select id, randomnumber from world where id = " << rid << ";";
|
|
|
- }
|
|
|
- void* areq = sqli->executeMultiQueryAsync(ss.str(), req, &TeBkUmLpqAsyncRouter::queriesMultiAsyncUtil, &TeBkUmLpqAsyncRouter::queriesMultiAsyncCh);
|
|
|
- sqli->completeAsync(areq, queryCount);
|
|
|
- } catch(const std::exception& e) {
|
|
|
- throw e;
|
|
|
- }
|
|
|
-}
|
|
|
-void TeBkUmLpqAsyncRouter::queriesMultiAsyncUtil(void* ctx, int rn, int cn, char * d, int l) {
|
|
|
- AsyncReq* req = (AsyncReq*)ctx;
|
|
|
- int tmp = 0;
|
|
|
- strToNum(d, l, tmp);
|
|
|
- if(cn==0) {
|
|
|
- req->vec.emplace_back(tmp);
|
|
|
- } else {
|
|
|
- req->vec.back().setRandomNumber(tmp);
|
|
|
+ std::stringstream ss;
|
|
|
+ for (int c = 0; c < queryCount; ++c) {
|
|
|
+ int rid = CommonUtils::fastrand(g_seed) % 10000 + 1;
|
|
|
+ ss << "select id, randomnumber from world where id = " << rid << ";";
|
|
|
}
|
|
|
-}
|
|
|
-void TeBkUmLpqAsyncRouter::queriesMultiAsyncCh(void* ctx, bool status, const std::string& q, int counter) {
|
|
|
- AsyncReq* req = (AsyncReq*)ctx;
|
|
|
- HttpResponse r;
|
|
|
- r.setHTTPResponseStatus(HTTPResponseStatus::Ok);
|
|
|
- JSONSerialize::serializeObjectCont(&req->vec, wcont_ser, "vector", r.getContentP());
|
|
|
- std::string d;
|
|
|
- r.generateHeadResponse(d, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, req->httpVers, req->conn_clos);
|
|
|
- req->sif->writeDirect(d);
|
|
|
- req->sif->writeDirect(r.getContent());
|
|
|
- req->sif->unUse();
|
|
|
- delete req;
|
|
|
-}
|
|
|
-#endif
|
|
|
|
|
|
+ //req->ss << "begin;";//NEVER USE - this creates a deadlock issue (like, DETAIL: Process 16 waits for ShareLock on transaction 995; blocked by process 19.)
|
|
|
+ LibpqAsyncReq* areq = req->sqli->getAsyncRequest();
|
|
|
+ LibpqQuery* qu = areq->getQuery();
|
|
|
+ qu->withSelectQuery(ss.str()).withMulti();
|
|
|
+
|
|
|
+ areq->withFinalCb(req, [](void* ctx, bool status, std::vector<PGresult*>* results, const std::string& q, int counter) {
|
|
|
+ AsyncUpdatesReq* req = (AsyncUpdatesReq*)ctx;
|
|
|
+ if(status) {
|
|
|
+ int queryCount = (int)results->size();
|
|
|
+
|
|
|
+ std::stringstream ss;
|
|
|
+ for (int i = 0; i < queryCount; ++i) {
|
|
|
+ PGresult* res = results->at(i);
|
|
|
+ int cols = PQnfields(res);
|
|
|
+ for (int j = 0; j < cols; ++j) {
|
|
|
+ int tmp = 0;
|
|
|
+ CommonUtils::fastStrToNum(PQgetvalue(res, 0, j), PQgetlength(res, 0, j), tmp);
|
|
|
+ if(j==0) req->vec.emplace_back(tmp);
|
|
|
+ else {
|
|
|
+ TeBkUmLpqAsyncWorld& w = req->vec.back();
|
|
|
+ int newRandomNumber = CommonUtils::fastrand(g_seed) % 10000 + 1;
|
|
|
+ if(tmp == newRandomNumber) {
|
|
|
+ newRandomNumber += 1;
|
|
|
+ if(newRandomNumber>=10000) {
|
|
|
+ newRandomNumber = 1;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ w.setRandomNumber(newRandomNumber);
|
|
|
+ ss << "begin;update world set randomnumber = " << newRandomNumber << " where id = " << w.getId() << ";commit;";
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
+ LibpqAsyncReq* areq = req->sqli->getAsyncRequest();
|
|
|
+ LibpqQuery* qu = areq->getQuery();
|
|
|
+ qu->withUpdateQuery(ss.str()).withMulti();
|
|
|
+
|
|
|
+ areq->withFinalCb(req, [](void* ctx, bool status, std::vector<PGresult*>* results, const std::string& q, int counter) {
|
|
|
+ AsyncUpdatesReq* req = (AsyncUpdatesReq*)ctx;
|
|
|
+ if(status) {
|
|
|
+ HttpResponse r;
|
|
|
+ JSONSerialize::serializeObjectCont(&req->vec, wcont_ser, "vector", r.getContentP());
|
|
|
+ std::string h;
|
|
|
+ r.httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, req->httpVers, req->conn_clos);
|
|
|
+ req->sif->writeDirect(h, r.getContent());
|
|
|
+ } else {
|
|
|
+ HttpResponse r;
|
|
|
+ std::string h;
|
|
|
+ r.httpStatus(HTTPResponseStatus::InternalServerError).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, req->httpVers, true);
|
|
|
+ req->sif->writeDirect(h);
|
|
|
+ }
|
|
|
+ req->sif->unUse();
|
|
|
+ delete req;
|
|
|
+ });
|
|
|
+ req->sqli->postAsync(areq, queryCount*3);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ req->sqli->postAsync(areq, queryCount);
|
|
|
+}
|
|
|
|
|
|
std::string& TeBkUmLpqAsyncRouter::getUpdQuery(int count) {
|
|
|
- std::map<int, std::string>::iterator it = _qC.find(count);
|
|
|
+ std::unordered_map<int, std::string>::iterator it = _qC.find(count);
|
|
|
if(it!=_qC.end()) {
|
|
|
return it->second;
|
|
|
}
|
|
|
|
|
|
std::stringstream ss;
|
|
|
- ss << "update world as t set randomnumber = case id ";
|
|
|
+ ss << "update world as t set randomnumber = case id";
|
|
|
|
|
|
int pc = 1;
|
|
|
for (int c = 0; c < count; ++c) {
|
|
|
- ss << "when $";
|
|
|
+ ss << " when $";
|
|
|
ss << pc++;
|
|
|
ss << " then $";
|
|
|
ss << pc++;
|
|
|
}
|
|
|
- ss << "else randomnumber end where id in (";
|
|
|
+ ss << " else randomnumber end where id in (";
|
|
|
for (int c = 0; c < count; ++c) {
|
|
|
ss << "$" << pc++ << ",";
|
|
|
}
|
|
|
std::string q = ss.str();
|
|
|
q = q.substr(0, q.length()-1);
|
|
|
q += ")";
|
|
|
+
|
|
|
_qC[count] = std::move(q);
|
|
|
return _qC[count];
|
|
|
}
|
|
|
-
|
|
|
-void TeBkUmLpqAsyncRouter::updatesAsyncb(const char* q, int ql, AsyncReq* req) {
|
|
|
+void TeBkUmLpqAsyncRouter::updatesAsyncb(const char* q, int ql, AsyncUpdatesReq* req) {
|
|
|
int queryCount = 0;
|
|
|
- strToNum(q, ql, queryCount);
|
|
|
- if(queryCount<1)queryCount=1;
|
|
|
- else if(queryCount>500)queryCount=500;
|
|
|
+ CommonUtils::fastStrToNum(q, ql, queryCount);
|
|
|
+ queryCount = std::max(1, std::min(queryCount, 500));
|
|
|
|
|
|
req->vec.reserve(queryCount);
|
|
|
-
|
|
|
- LibpqDataSourceImpl* sqli = getDb();
|
|
|
- req->sqli = sqli;
|
|
|
-
|
|
|
- try {
|
|
|
- void* areq = NULL;
|
|
|
- for (int c = 0; c < queryCount; ++c) {
|
|
|
- int rid = rand() % 10000 + 1;
|
|
|
- std::vector<LibpqParam> pars;
|
|
|
- LibpqDataSourceImpl::ADD_INT4(pars, rid);
|
|
|
- areq = sqli->executeQueryAsync(WORLD_ONE_QUERY, std::move(pars), req, &TeBkUmLpqAsyncRouter::queriesAsyncUtil, &TeBkUmLpqAsyncRouter::updatesAsyncbChQ, areq);
|
|
|
- }
|
|
|
- sqli->completeAsync(areq);
|
|
|
- } catch(const std::exception& e) {
|
|
|
- throw e;
|
|
|
+ req->sqli = getDb(3);
|
|
|
+
|
|
|
+ LibpqAsyncReq* areq = req->sqli->getAsyncRequest();
|
|
|
+ for (int c = 0; c < queryCount; ++c) {
|
|
|
+ int rid = CommonUtils::fastrand(g_seed) % 10000 + 1;
|
|
|
+ LibpqQuery* q = areq->getQuery();
|
|
|
+ q->withParamInt4(rid);
|
|
|
+ q->withSelectQuery(WORLD_ONE_QUERY);
|
|
|
}
|
|
|
-}
|
|
|
-void TeBkUmLpqAsyncRouter::updatesAsyncbChQ(void* ctx, bool status, const std::string& q, int counter) {
|
|
|
- AsyncReq* req = (AsyncReq*)ctx;
|
|
|
-
|
|
|
- LibpqDataSourceImpl* sqli = req->sqli;
|
|
|
-
|
|
|
- int queryCount = (int)req->vec.size();
|
|
|
- std::vector<LibpqParam> pars;
|
|
|
-
|
|
|
- for(std::vector<TeBkUmLpqAsyncWorld>::iterator it=req->vec.begin(); it != req->vec.end(); ++it) {
|
|
|
- LibpqDataSourceImpl::ADD_INT4(pars, (*it).getId());
|
|
|
-
|
|
|
- int newRandomNumber = rand() % 10000 + 1;
|
|
|
- if((*it).getRandomNumber() == newRandomNumber) {
|
|
|
- newRandomNumber += 1;
|
|
|
- if(newRandomNumber>=10000) {
|
|
|
- newRandomNumber = 1;
|
|
|
+ areq->withFinalCb(req, [](void* ctx, bool status, std::vector<PGresult*>* results, const std::string& query, int counter) {
|
|
|
+ AsyncUpdatesReq* req = (AsyncUpdatesReq*)ctx;
|
|
|
+
|
|
|
+ int queryCount = (int)results->size();
|
|
|
+
|
|
|
+ LibpqAsyncReq* areq = req->sqli->getAsyncRequest();
|
|
|
+ req->sqli->beginAsync(areq);
|
|
|
+ LibpqQuery* q = areq->getQuery();
|
|
|
+ q->withUpdateQuery(getUpdQuery(queryCount)).withContext(req);
|
|
|
+
|
|
|
+ for (int i = 0; i < queryCount; ++i) {
|
|
|
+ PGresult* res = results->at(i);
|
|
|
+ int cols = PQnfields(res);
|
|
|
+ for (int j = 0; j < cols; ++j) {
|
|
|
+ if(j==0) req->vec.emplace_back(ntohl(*((uint32_t *) PQgetvalue(res, 0, j))));
|
|
|
+ else {
|
|
|
+ int tmp = ntohl(*((uint32_t *) PQgetvalue(res, 0, j)));
|
|
|
+ TeBkUmLpqAsyncWorld& w = req->vec.back();
|
|
|
+ int newRandomNumber = CommonUtils::fastrand(g_seed) % 10000 + 1;
|
|
|
+ if(tmp == newRandomNumber) {
|
|
|
+ newRandomNumber += 1;
|
|
|
+ if(newRandomNumber>=10000) {
|
|
|
+ newRandomNumber = 1;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ w.setRandomNumber(newRandomNumber);
|
|
|
+ q->withParamInt4(w.getId());
|
|
|
+ q->withParamInt4(w.getRandomNumber());
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
- LibpqDataSourceImpl::ADD_INT4(pars, newRandomNumber);
|
|
|
- (*it).setRandomNumber(newRandomNumber);
|
|
|
- }
|
|
|
- for(std::vector<TeBkUmLpqAsyncWorld>::iterator it=req->vec.begin(); it != req->vec.end(); ++it) {
|
|
|
- LibpqDataSourceImpl::ADD_INT4(pars, (*it).getId());
|
|
|
- }
|
|
|
-
|
|
|
- void* areq = sqli->beginAsync(NULL);
|
|
|
- sqli->executeUpdateQueryAsync(getUpdQuery(queryCount), std::move(pars), NULL, NULL, areq, true);
|
|
|
- sqli->commitAsync(areq);
|
|
|
-
|
|
|
- AsyncReq* ar = new AsyncReq;
|
|
|
- ar->sif = req->sif;
|
|
|
- ar->httpVers = req->httpVers;
|
|
|
- ar->conn_clos = req->conn_clos;
|
|
|
- ar->vec = std::move(req->vec);
|
|
|
- req->sif = NULL;
|
|
|
-
|
|
|
- try {
|
|
|
- sqli->completeAsync(areq, ar, &TeBkUmLpqAsyncRouter::updatesAsyncbChU);
|
|
|
- } catch(const std::exception& e) {
|
|
|
- throw e;
|
|
|
- }
|
|
|
-}
|
|
|
-void TeBkUmLpqAsyncRouter::updatesAsyncbChU(void* ctx, bool status, const std::string& q, int counter) {
|
|
|
- AsyncReq* req = (AsyncReq*)ctx;
|
|
|
- HttpResponse r;
|
|
|
- r.setHTTPResponseStatus(HTTPResponseStatus::Ok);
|
|
|
- JSONSerialize::serializeObjectCont(&req->vec, wcont_ser, "vector", r.getContentP());
|
|
|
- std::string d;
|
|
|
- r.generateHeadResponse(d, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, req->httpVers, req->conn_clos);
|
|
|
- req->sif->writeDirect(d);
|
|
|
- req->sif->writeDirect(r.getContent());
|
|
|
- req->sif->unUse();
|
|
|
- delete req;
|
|
|
+ for(auto w: req->vec) {
|
|
|
+ q->withParamInt4(w.getId());
|
|
|
+ }
|
|
|
+ req->sqli->commitAsync(areq);
|
|
|
+
|
|
|
+ areq->withFinalCb(req, [](void* ctx, bool status, std::vector<PGresult*>* results, const std::string& query, int counter) {
|
|
|
+ AsyncUpdatesReq* req = (AsyncUpdatesReq*)ctx;
|
|
|
+ if(status) {
|
|
|
+ HttpResponse r;
|
|
|
+ JSONSerialize::serializeObjectCont(&req->vec, wcont_ser, "vector", r.getContentP());
|
|
|
+ std::string h;
|
|
|
+ r.httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, req->httpVers, req->conn_clos);
|
|
|
+ req->sif->writeDirect(h, r.getContent());
|
|
|
+ } else {
|
|
|
+ HttpResponse r;
|
|
|
+ std::string h;
|
|
|
+ r.httpStatus(HTTPResponseStatus::InternalServerError).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, req->httpVers, true);
|
|
|
+ req->sif->writeDirect(h);
|
|
|
+ }
|
|
|
+ req->sif->unUse();
|
|
|
+ delete req;
|
|
|
+ });
|
|
|
+ req->sqli->postAsync(areq);
|
|
|
+ });
|
|
|
+ req->sqli->postAsync(areq);
|
|
|
}
|
|
|
|
|
|
-void TeBkUmLpqAsyncRouter::updatesAsync(const char* q, int ql, AsyncReq* req) {
|
|
|
+void TeBkUmLpqAsyncRouter::updatesAsync(const char* q, int ql, AsyncUpdatesReq* req) {
|
|
|
int queryCount = 0;
|
|
|
- strToNum(q, ql, queryCount);
|
|
|
- if(queryCount<1)queryCount=1;
|
|
|
- else if(queryCount>500)queryCount=500;
|
|
|
+ CommonUtils::fastStrToNum(q, ql, queryCount);
|
|
|
+ queryCount = std::max(1, std::min(queryCount, 500));
|
|
|
|
|
|
req->vec.reserve(queryCount);
|
|
|
|
|
|
- LibpqDataSourceImpl* sqli = getDb();
|
|
|
- req->sqli = sqli;
|
|
|
+ req->sqli = getDb(3);
|
|
|
|
|
|
- try {
|
|
|
- void* areq = NULL;
|
|
|
- for (int c = 0; c < queryCount; ++c) {
|
|
|
- int rid = rand() % 10000 + 1;
|
|
|
- std::vector<LibpqParam> pars;
|
|
|
- LibpqDataSourceImpl::ADD_INT4(pars, rid);
|
|
|
- areq = sqli->executeQueryAsync(WORLD_ONE_QUERY, std::move(pars), req, &TeBkUmLpqAsyncRouter::queriesAsyncUtil, &TeBkUmLpqAsyncRouter::updatesAsyncChQ, areq);
|
|
|
- }
|
|
|
- sqli->completeAsync(areq);
|
|
|
- } catch(const std::exception& e) {
|
|
|
- throw e;
|
|
|
+ LibpqAsyncReq* areq = req->sqli->getAsyncRequest();
|
|
|
+ for (int c = 0; c < queryCount; ++c) {
|
|
|
+ int rid = CommonUtils::fastrand(g_seed) % 10000 + 1;
|
|
|
+ LibpqQuery* qu = areq->getQuery();
|
|
|
+ qu->withParamInt4(rid);
|
|
|
+ qu->withSelectQuery(WORLD_ONE_QUERY);
|
|
|
}
|
|
|
-}
|
|
|
-void TeBkUmLpqAsyncRouter::updatesAsyncChQ(void* ctx, bool status, const std::string& q, int counter) {
|
|
|
- AsyncReq* req = (AsyncReq*)ctx;
|
|
|
-
|
|
|
- std::stringstream ss;
|
|
|
- //ss << "update world as t set randomnumber = c.randomnumber from (values";
|
|
|
-
|
|
|
- LibpqDataSourceImpl* sqli = req->sqli;
|
|
|
-
|
|
|
- void* areq = NULL;
|
|
|
- for(std::vector<TeBkUmLpqAsyncWorld>::iterator it=req->vec.begin(); it != req->vec.end(); ++it) {
|
|
|
- int newRandomNumber = rand() % 10000 + 1;
|
|
|
- if((*it).getRandomNumber() == newRandomNumber) {
|
|
|
- newRandomNumber += 1;
|
|
|
- if(newRandomNumber>=10000) {
|
|
|
- newRandomNumber = 1;
|
|
|
+ areq->withFinalCb(req, [](void* ctx, bool status, std::vector<PGresult*>* results, const std::string& query, int counter) {
|
|
|
+ AsyncUpdatesReq* req = (AsyncUpdatesReq*)ctx;
|
|
|
+ LibpqAsyncReq* areq = req->sqli->getAsyncRequest();
|
|
|
+
|
|
|
+ for (int i = 0; i < (int)results->size(); ++i) {
|
|
|
+ PGresult* res = results->at(i);
|
|
|
+ int cols = PQnfields(res);
|
|
|
+ for (int j = 0; j < cols; ++j) {
|
|
|
+ if(j==0) req->vec.emplace_back(ntohl(*((uint32_t *) PQgetvalue(res, 0, j))));
|
|
|
+ else {
|
|
|
+ int tmp = ntohl(*((uint32_t *) PQgetvalue(res, 0, j)));
|
|
|
+ TeBkUmLpqAsyncWorld& w = req->vec.back();
|
|
|
+ int newRandomNumber = CommonUtils::fastrand(g_seed) % 10000 + 1;
|
|
|
+ if(tmp == newRandomNumber) {
|
|
|
+ newRandomNumber += 1;
|
|
|
+ if(newRandomNumber>=10000) {
|
|
|
+ newRandomNumber = 1;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ w.setRandomNumber(newRandomNumber);
|
|
|
+
|
|
|
+ std::stringstream ss;
|
|
|
+ ss << "update world set randomnumber = " << newRandomNumber << " where id = " << w.getId();
|
|
|
+
|
|
|
+ req->sqli->beginAsync(areq);
|
|
|
+ LibpqQuery* q = areq->getQuery();
|
|
|
+ q->withUpdateQuery(ss.str(), false);
|
|
|
+ req->sqli->commitAsync(areq);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
- (*it).setRandomNumber(newRandomNumber);
|
|
|
- if(areq==NULL) {
|
|
|
- areq = sqli->beginAsync(areq);
|
|
|
- } else {
|
|
|
- sqli->beginAsync(areq);
|
|
|
- }
|
|
|
- ss.str(std::string());
|
|
|
- std::vector<LibpqParam> pars;
|
|
|
- ss << "update world set randomnumber = " << newRandomNumber << " where id = " << (*it).getId();
|
|
|
- sqli->executeUpdateQueryAsync(ss.str(), std::move(pars), NULL, NULL, areq, false);
|
|
|
- sqli->commitAsync(areq);
|
|
|
- /*if(c!=queryCount-1) {
|
|
|
- ss << ",";
|
|
|
- }*/
|
|
|
- }
|
|
|
- //ss << ") as c(id, randomnumber) where c.id = t.id";
|
|
|
-
|
|
|
- AsyncReq* ar = new AsyncReq;
|
|
|
- ar->sif = req->sif;
|
|
|
- ar->httpVers = req->httpVers;
|
|
|
- ar->conn_clos = req->conn_clos;
|
|
|
- ar->vec = std::move(req->vec);
|
|
|
- req->sif = NULL;
|
|
|
-
|
|
|
- try {
|
|
|
- sqli->completeAsync(areq, ar, &TeBkUmLpqAsyncRouter::updatesAsyncChU);
|
|
|
- } catch(const std::exception& e) {
|
|
|
- throw e;
|
|
|
- }
|
|
|
-}
|
|
|
-void TeBkUmLpqAsyncRouter::updatesAsyncChU(void* ctx, bool status, const std::string& q, int counter) {
|
|
|
- AsyncReq* req = (AsyncReq*)ctx;
|
|
|
- HttpResponse r;
|
|
|
- r.setHTTPResponseStatus(HTTPResponseStatus::Ok);
|
|
|
- JSONSerialize::serializeObjectCont(&req->vec, wcont_ser, "vector", r.getContentP());
|
|
|
- std::string d;
|
|
|
- r.generateHeadResponse(d, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, req->httpVers, req->conn_clos);
|
|
|
- req->sif->writeDirect(d);
|
|
|
- req->sif->writeDirect(r.getContent());
|
|
|
- req->sif->unUse();
|
|
|
- delete req;
|
|
|
+
|
|
|
+ areq->withFinalCb(req, [](void* ctx, bool status, std::vector<PGresult*>* results, const std::string& query, int counter) {
|
|
|
+ AsyncUpdatesReq* req = (AsyncUpdatesReq*)ctx;
|
|
|
+ if(status) {
|
|
|
+ HttpResponse r;
|
|
|
+ JSONSerialize::serializeObjectCont(&req->vec, wcont_ser, "vector", r.getContentP());
|
|
|
+ std::string h;
|
|
|
+ r.httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, req->httpVers, req->conn_clos);
|
|
|
+ req->sif->writeDirect(h, r.getContent());
|
|
|
+ } else {
|
|
|
+ HttpResponse r;
|
|
|
+ std::string h;
|
|
|
+ r.httpStatus(HTTPResponseStatus::InternalServerError).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, req->httpVers, true);
|
|
|
+ req->sif->writeDirect(h);
|
|
|
+ }
|
|
|
+ req->sif->unUse();
|
|
|
+ delete req;
|
|
|
+ });
|
|
|
+ req->sqli->postAsync(areq);
|
|
|
+ });
|
|
|
+ req->sqli->postAsync(areq);
|
|
|
}
|
|
|
|
|
|
void TeBkUmLpqAsyncRouter::updateCache() {
|
|
|
- LibpqDataSourceImpl* sqli = getDb();
|
|
|
+ LibpqDataSourceImpl* sqli = getDb(1);
|
|
|
|
|
|
- CacheReq* req = new CacheReq;
|
|
|
+ AsyncCacheReq* req = new AsyncCacheReq;
|
|
|
req->cchi = CacheManager::getImpl();
|
|
|
|
|
|
- try {
|
|
|
- std::vector<LibpqParam> pars;
|
|
|
- void* areq = sqli->executeQueryAsync(WORLD_ALL_QUERY, std::move(pars), req, &TeBkUmLpqAsyncRouter::updateCacheAsyncUtil, &TeBkUmLpqAsyncRouter::updateCacheAsyncCh, NULL);
|
|
|
- sqli->completeAsync(areq);
|
|
|
- } catch(const std::exception& e) {
|
|
|
- throw e;
|
|
|
- }
|
|
|
-}
|
|
|
-void TeBkUmLpqAsyncRouter::updateCacheAsyncUtil(void* ctx, int rn, std::vector<LibpqRes>& data) {
|
|
|
- CacheReq* req = (CacheReq*)ctx;
|
|
|
- req->vec.emplace_back(ntohl(*((uint32_t *) data.at(0).d)), ntohl(*((uint32_t *) data.at(1).d)));
|
|
|
-}
|
|
|
-void TeBkUmLpqAsyncRouter::updateCacheAsyncCh(void* ctx, bool status, const std::string& q, int counter) {
|
|
|
- CacheReq* req = (CacheReq*)ctx;
|
|
|
- CacheInterface* cchi = req->cchi;
|
|
|
+ LibpqAsyncReq* areq = sqli->getAsyncRequest();
|
|
|
+ LibpqQuery* q = areq->getQuery();
|
|
|
+ q->withSelectQuery(WORLD_ALL_QUERY).withContext(req).withCb3([](void* ctx, bool endofdata, int row, int col, char* value) {
|
|
|
+ AsyncCacheReq* req = (AsyncCacheReq*)ctx;
|
|
|
+ if(col==0) {
|
|
|
+ req->vec.emplace_back(ntohl(*((uint32_t *) value)));
|
|
|
+ } else {
|
|
|
+ req->vec.back().setRandomNumber(ntohl(*((uint32_t *) value)));
|
|
|
+ }
|
|
|
|
|
|
- try {
|
|
|
- for(std::vector<TeBkUmLpqAsyncWorld>::iterator it=req->vec.begin(); it != req->vec.end(); ++it) {
|
|
|
- char str[12];
|
|
|
- sprintf(str, "%d;%d", (*it).getId(), (*it).getRandomNumber());
|
|
|
- cchi->setRaw(CastUtil::fromNumber((*it).getId()), str);
|
|
|
+ if(endofdata) {
|
|
|
+ CacheInterface* cchi = req->cchi;
|
|
|
+ try {
|
|
|
+ for(std::vector<TeBkUmLpqAsyncWorld>::iterator it=req->vec.begin(); it != req->vec.end(); ++it) {
|
|
|
+ char str[12];
|
|
|
+ sprintf(str, "%d;%d", (*it).getId(), (*it).getRandomNumber());
|
|
|
+ cchi->setRaw((*it).getId(), str);
|
|
|
+ }
|
|
|
+ CacheManager::cleanImpl(cchi);
|
|
|
+ delete req;
|
|
|
+ CacheManager::triggerAppInitCompletion("te-benchmark-um-pq-async");
|
|
|
+ } catch(const std::exception& e) {
|
|
|
+ CacheManager::cleanImpl(cchi);
|
|
|
+ delete req;
|
|
|
+ }
|
|
|
}
|
|
|
- CacheManager::cleanImpl(cchi);
|
|
|
- delete req;
|
|
|
- CacheManager::triggerAppInitCompletion("te-benchmark-um-pq-async");
|
|
|
- } catch(const std::exception& e) {
|
|
|
- CacheManager::cleanImpl(cchi);
|
|
|
- delete req;
|
|
|
- throw e;
|
|
|
- }
|
|
|
+ });
|
|
|
+ sqli->postAsync(areq);
|
|
|
}
|
|
|
-
|
|
|
void TeBkUmLpqAsyncRouter::cachedWorlds(const char* q, int ql, std::vector<TeBkUmLpqAsyncWorld>& wlst) {
|
|
|
int queryCount = 0;
|
|
|
- strToNum(q, ql, queryCount);
|
|
|
- if(queryCount<1)queryCount=1;
|
|
|
- else if(queryCount>500)queryCount=500;
|
|
|
+ CommonUtils::fastStrToNum(q, ql, queryCount);
|
|
|
+ queryCount = std::max(1, std::min(queryCount, 500));
|
|
|
|
|
|
wlst.reserve(queryCount);
|
|
|
|
|
|
CacheInterface* cchi = CacheManager::getImpl();
|
|
|
|
|
|
- try {
|
|
|
- std::vector<std::string> keys;
|
|
|
- for (int c = 0; c < queryCount; ++c) {
|
|
|
- int rid = rand() % 10000 + 1;
|
|
|
- std::string v = cchi->getValue(CastUtil::fromNumber(rid));
|
|
|
- size_t fn = v.find(";");
|
|
|
- int tmp = 0;
|
|
|
- strToNum(v.substr(0, fn).c_str(), fn, tmp);
|
|
|
- int tmp1 = 0;
|
|
|
- strToNum(v.substr(fn+1).c_str(), v.length()-fn-1, tmp1);
|
|
|
- wlst.emplace_back(tmp, tmp1);
|
|
|
- }
|
|
|
- CacheManager::cleanImpl(cchi);
|
|
|
- } catch(const std::exception& e) {
|
|
|
- CacheManager::cleanImpl(cchi);
|
|
|
- throw e;
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-
|
|
|
-void TeBkUmLpqAsyncRouter::getContextAsync(AsyncReq* req) {
|
|
|
- LibpqDataSourceImpl* sqli = getDb();
|
|
|
-
|
|
|
- try {
|
|
|
- std::vector<LibpqParam> pars;
|
|
|
- void* areq = sqli->executeQueryAsync(FORTUNE_ALL_QUERY, std::move(pars), req, &TeBkUmLpqAsyncRouter::getContextAsyncUtil, &TeBkUmLpqAsyncRouter::getContextAsyncCh, NULL);
|
|
|
- sqli->completeAsync(areq);
|
|
|
- } catch(...) {
|
|
|
- throw;
|
|
|
+ std::vector<unsigned long long> keys;
|
|
|
+ for (int c = 0; c < queryCount; ++c) {
|
|
|
+ keys.emplace_back(CommonUtils::fastrand(g_seed) % 10000 + 1);
|
|
|
}
|
|
|
-}
|
|
|
-void TeBkUmLpqAsyncRouter::getContextAsyncUtil(void* ctx, int rn, int cn, char * d, int l) {
|
|
|
- AsyncReq* req = (AsyncReq*)ctx;
|
|
|
- if(cn==0) {
|
|
|
- req->flst.emplace_back(ntohl(*((uint32_t *) d)));
|
|
|
- } else {
|
|
|
- TeBkUmLpqAsyncFortune& w = req->flst.back();
|
|
|
- w.message = CryptoHandler::sanitizeHtmlFast((const uint8_t *)d, (size_t)l, w.message_i, w.allocd);
|
|
|
+ std::vector<std::string> values;
|
|
|
+ cchi->getValues(keys, values);
|
|
|
+ for (int c = 0; c < queryCount; ++c) {
|
|
|
+ std::string& v = values.at(c);
|
|
|
+ size_t fn = v.find(";");
|
|
|
+ int tmp = 0;
|
|
|
+ CommonUtils::fastStrToNum(v.substr(0, fn).c_str(), fn, tmp);
|
|
|
+ int tmp1 = 0;
|
|
|
+ CommonUtils::fastStrToNum(v.substr(fn+1).c_str(), v.length()-fn-1, tmp1);
|
|
|
+ wlst.emplace_back(tmp, tmp1);
|
|
|
}
|
|
|
-}
|
|
|
+ CacheManager::cleanImpl(cchi);
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+void TeBkUmLpqAsyncRouter::fortunes(SocketInterface* sif) {
|
|
|
+ LibpqDataSourceImpl* sqli = getDb(7);
|
|
|
+ LibpqAsyncReq* areq = sqli->getAsyncRequest();
|
|
|
+ LibpqQuery* q = areq->getQuery();
|
|
|
+ q->withSelectQuery(FORTUNE_ALL_QUERY).withContext(sif).withCb0([](void* ctx, PGresult* res) {
|
|
|
+ SocketInterface* sif = (SocketInterface*)ctx;
|
|
|
+
|
|
|
+ std::list<TeBkUmLpqAsyncFortune> flst;
|
|
|
+ int cols = PQnfields(res);
|
|
|
+ int rows = PQntuples(res);
|
|
|
+ for(int i=0; i<rows; i++) {
|
|
|
+ for (int j = 0; j < cols; ++j) {
|
|
|
+ if(j==0) {
|
|
|
+ flst.emplace_back(ntohl(*((uint32_t *) PQgetvalue(res, i, j))));
|
|
|
+ } else {
|
|
|
+ TeBkUmLpqAsyncFortune& w = flst.back();
|
|
|
+ w.message = CryptoHandler::sanitizeHtmlFast((const uint8_t *)PQgetvalue(res, i, j), (size_t)PQgetlength(res, i, j), w.message_i, w.allocd);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
-void TeBkUmLpqAsyncRouter::getContextAsyncCh(void* ctx, bool status, const std::string& q, int counter) {
|
|
|
- AsyncReq* req = (AsyncReq*)ctx;
|
|
|
- Context context;
|
|
|
+ Context context;
|
|
|
|
|
|
- req->flst.emplace_back(0, "Additional fortune added at request time.");
|
|
|
- req->flst.sort();
|
|
|
+ flst.emplace_back(0, "Additional fortune added at request time.");
|
|
|
+ flst.sort();
|
|
|
|
|
|
- context.insert(std::pair<std::string, void*>("fortunes", &req->flst));
|
|
|
+ context.emplace("fortunes", &flst);
|
|
|
|
|
|
- if(tmplFunc!=NULL)
|
|
|
- {
|
|
|
fcpstream str;
|
|
|
tmplFunc(&context, str);
|
|
|
+ std::string out = str.str();
|
|
|
HttpResponse r;
|
|
|
- r.setHTTPResponseStatus(HTTPResponseStatus::Ok);
|
|
|
- std::string d;
|
|
|
- r.generateHeadResponse(d, ContentTypes::CONTENT_TYPE_TEXT_SHTML, req->httpVers, req->conn_clos, (int)str.str().length());
|
|
|
- req->sif->writeDirect(d);
|
|
|
- req->sif->writeDirect(str.str());
|
|
|
- req->sif->unUse();
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- ResponseData d;
|
|
|
- HttpResponse r;
|
|
|
- r.generateHeadResponse(d._b, req->httpVers, req->conn_clos);
|
|
|
- req->sif->writeTo(&d);
|
|
|
- req->sif->unUse();
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-//https://stackoverflow.com/questions/9631225/convert-strings-specified-by-length-not-nul-terminated-to-int-float
|
|
|
-bool TeBkUmLpqAsyncRouter::strToNum(const char* str, int len, int& ret) {
|
|
|
- ret = 0;
|
|
|
- for(int i = 0; i < len; ++i)
|
|
|
- {
|
|
|
- if(!isdigit(str[i])) return false;
|
|
|
- ret = ret * 10 + (str[i] - '0');
|
|
|
- }
|
|
|
- return true;
|
|
|
+ std::string h;
|
|
|
+ r.httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_TEXT_HTML, 1.1, false, (int)out.length());
|
|
|
+ sif->writeDirect(h, out);
|
|
|
+ sif->unUse();
|
|
|
+ });
|
|
|
+ sqli->postAsync(areq);
|
|
|
}
|
|
|
|
|
|
bool TeBkUmLpqAsyncRouter::route(HttpRequest* req, HttpResponse* res, SocketInterface* sif) {
|
|
|
sif->use();
|
|
|
if(StringUtil::endsWith(req->getPath(), "/plaintext")) {
|
|
|
- res->setHTTPResponseStatus(HTTPResponseStatus::Ok);
|
|
|
- std::string d;
|
|
|
- res->generateHeadResponse(d, ContentTypes::CONTENT_TYPE_TEXT_PLAIN, (int)HELLO_WORLD.length());
|
|
|
- sif->writeDirect(d);
|
|
|
- sif->writeDirect(HELLO_WORLD);
|
|
|
+ std::string h;
|
|
|
+ res->httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_TEXT_PLAIN, (int)HELLO_WORLD.length());
|
|
|
+ sif->writeDirect(h, HELLO_WORLD);
|
|
|
sif->unUse();
|
|
|
} else if(StringUtil::endsWith(req->getPath(), "/json")) {
|
|
|
TeBkUmLpqAsyncMessage msg;
|
|
|
msg.setMessage(HELLO_WORLD);
|
|
|
- res->setHTTPResponseStatus(HTTPResponseStatus::Ok);
|
|
|
JSONSerialize::serializeObject(&msg, m_ser, res->getContentP());
|
|
|
- std::string d;
|
|
|
- res->generateHeadResponse(d, ContentTypes::CONTENT_TYPE_APPLICATION_JSON);
|
|
|
- sif->writeDirect(d);
|
|
|
- sif->writeDirect(res->getContent());
|
|
|
+ std::string h;
|
|
|
+ res->httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON);
|
|
|
+ sif->writeDirect(h, res->getContent());
|
|
|
sif->unUse();
|
|
|
} else if(StringUtil::endsWith(req->getPath(), "/db")) {
|
|
|
- AsyncReq* ar = new AsyncReq;
|
|
|
+ /*AsyncDbReq* ar = new AsyncDbReq;
|
|
|
ar->sif = sif;
|
|
|
ar->httpVers = req->getHttpVers();
|
|
|
- ar->conn_clos = req->isClose();
|
|
|
- dbAsync(ar);
|
|
|
+ ar->conn_clos = req->isClose();*/
|
|
|
+ dbAsync(sif);
|
|
|
} else if(StringUtil::endsWith(req->getPath(), "/queries")) {
|
|
|
struct yuarel_param params[1];
|
|
|
yuarel_parse_query((char*)req->getQueryStr().data(), req->getQueryStr().size(), params, 1);
|
|
|
- AsyncReq* ar = new AsyncReq;
|
|
|
+ /*AsyncQueriesReq* ar = new AsyncQueriesReq;
|
|
|
ar->sif = sif;
|
|
|
ar->httpVers = req->getHttpVers();
|
|
|
- ar->conn_clos = req->isClose();
|
|
|
- queriesAsync(params[0].val, params[0].val_len, ar);
|
|
|
- }
|
|
|
-#ifndef HAVE_LIBPQ_BATCH
|
|
|
- else if(StringUtil::endsWith(req->getPath(), "/queriem")) {
|
|
|
+ ar->conn_clos = req->isClose();*/
|
|
|
+ queriesAsync(params[0].val, params[0].val_len, sif);
|
|
|
+ } else if(StringUtil::endsWith(req->getPath(), "/queriem")) {
|
|
|
struct yuarel_param params[1];
|
|
|
yuarel_parse_query((char*)req->getQueryStr().data(), req->getQueryStr().size(), params, 1);
|
|
|
- AsyncReq* ar = new AsyncReq;
|
|
|
+ /*AsyncQueriesReq* ar = new AsyncQueriesReq;
|
|
|
ar->sif = sif;
|
|
|
+ ar->httpVers = req->getHttpVers();
|
|
|
+ ar->conn_clos = req->isClose();*/
|
|
|
+ queriesMultiAsync(params[0].val, params[0].val_len, sif);
|
|
|
+ } else if(StringUtil::endsWith(req->getPath(), "/updatem")) {
|
|
|
+ struct yuarel_param params[1];
|
|
|
+ yuarel_parse_query((char*)req->getQueryStr().data(), req->getQueryStr().size(), params, 1);
|
|
|
+ AsyncUpdatesReq* ar = new AsyncUpdatesReq;
|
|
|
ar->sif = sif;
|
|
|
ar->httpVers = req->getHttpVers();
|
|
|
ar->conn_clos = req->isClose();
|
|
|
- queriesMultiAsync(params[0].val, params[0].val_len, ar);
|
|
|
+ updatesMulti(params[0].val, params[0].val_len, ar);
|
|
|
}
|
|
|
-#endif
|
|
|
else if(StringUtil::endsWith(req->getPath(), "/fortunes")) {
|
|
|
- AsyncReq* ar = new AsyncReq;
|
|
|
- ar->sif = sif;
|
|
|
+ /*AsyncFortuneReq* ar = new AsyncFortuneReq;
|
|
|
ar->sif = sif;
|
|
|
ar->httpVers = req->getHttpVers();
|
|
|
- ar->conn_clos = req->isClose();
|
|
|
- getContextAsync(ar);
|
|
|
- } else if(StringUtil::endsWith(req->getPath(), "/bupdates")) {
|
|
|
+ ar->conn_clos = req->isClose();*/
|
|
|
+ fortunes(sif);
|
|
|
+ } else if(StringUtil::endsWith(req->getPath(), "/bupdates") || StringUtil::endsWith(req->getPath(), "/updates")) {
|
|
|
struct yuarel_param params[1];
|
|
|
yuarel_parse_query((char*)req->getQueryStr().data(), req->getQueryStr().size(), params, 1);
|
|
|
- AsyncReq* ar = new AsyncReq;
|
|
|
- ar->sif = sif;
|
|
|
+ AsyncUpdatesReq* ar = new AsyncUpdatesReq;
|
|
|
ar->sif = sif;
|
|
|
ar->httpVers = req->getHttpVers();
|
|
|
ar->conn_clos = req->isClose();
|
|
|
updatesAsyncb(params[0].val, params[0].val_len, ar);
|
|
|
- } else if(StringUtil::endsWith(req->getPath(), "/updates")) {
|
|
|
+ } else if(StringUtil::endsWith(req->getPath(), "/update_")) {
|
|
|
struct yuarel_param params[1];
|
|
|
yuarel_parse_query((char*)req->getQueryStr().data(), req->getQueryStr().size(), params, 1);
|
|
|
- AsyncReq* ar = new AsyncReq;
|
|
|
- ar->sif = sif;
|
|
|
+ AsyncUpdatesReq* ar = new AsyncUpdatesReq;
|
|
|
ar->sif = sif;
|
|
|
ar->httpVers = req->getHttpVers();
|
|
|
ar->conn_clos = req->isClose();
|
|
@@ -637,18 +641,15 @@ bool TeBkUmLpqAsyncRouter::route(HttpRequest* req, HttpResponse* res, SocketInte
|
|
|
yuarel_parse_query((char*)req->getQueryStr().data(), req->getQueryStr().size(), params, 1);
|
|
|
std::vector<TeBkUmLpqAsyncWorld> msg;
|
|
|
cachedWorlds(params[0].val, params[0].val_len, msg);
|
|
|
- res->setHTTPResponseStatus(HTTPResponseStatus::Ok);
|
|
|
JSONSerialize::serializeObjectCont(&msg, wcont_ser, "vector", res->getContentP());
|
|
|
- std::string d;
|
|
|
- res->generateHeadResponse(d, ContentTypes::CONTENT_TYPE_APPLICATION_JSON);
|
|
|
- sif->writeDirect(d);
|
|
|
- sif->writeDirect(res->getContent());
|
|
|
+ std::string h;
|
|
|
+ res->httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON);
|
|
|
+ sif->writeDirect(h, res->getContent());
|
|
|
sif->unUse();
|
|
|
} else {
|
|
|
- res->setHTTPResponseStatus(HTTPResponseStatus::NotFound);
|
|
|
- std::string d;
|
|
|
- res->generateHeadResponse(d, ContentTypes::CONTENT_TYPE_TEXT_PLAIN);
|
|
|
- sif->writeDirect(d);
|
|
|
+ std::string h;
|
|
|
+ res->httpStatus(HTTPResponseStatus::NotFound).generateHeadResponse(h, req->getHttpVers(), true);
|
|
|
+ sif->writeDirect(h);
|
|
|
sif->unUse();
|
|
|
}
|
|
|
return false;
|
|
@@ -668,11 +669,58 @@ TeBkUmLpqAsyncRouter::TeBkUmLpqAsyncRouter() {
|
|
|
}
|
|
|
|
|
|
TeBkUmLpqAsyncRouter::~TeBkUmLpqAsyncRouter() {
|
|
|
+ if(sqli!=NULL) {
|
|
|
+ DataSourceManager::cleanRawImpl(sqli);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
-LibpqDataSourceImpl* TeBkUmLpqAsyncRouter::getDb() {
|
|
|
+LibpqDataSourceImpl* TeBkUmLpqAsyncRouter::getDb(int max) {
|
|
|
if(sqli==NULL) {
|
|
|
sqli = static_cast<LibpqDataSourceImpl*>(DataSourceManager::getRawImpl("PostgreSQL-DSN", "te-benchmark-um-pq-async"));
|
|
|
}
|
|
|
return sqli;
|
|
|
}
|
|
|
+
|
|
|
+LibpqDataSourceImpl* TeBkUmLpqAsyncRouterPooled::getDb(int max) {
|
|
|
+ if(max==0) {
|
|
|
+ max = maxconns;
|
|
|
+ } else {
|
|
|
+ max = std::min(max, maxconns);
|
|
|
+ }
|
|
|
+ int pc = 0;
|
|
|
+ if(inited) {
|
|
|
+ pc = ++opt;
|
|
|
+ if(pc>=INT_MAX-1) {
|
|
|
+ opt = 0;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ for (int var = 0; var < maxconns; ++var) {
|
|
|
+ pool.push_back(static_cast<LibpqDataSourceImpl*>(DataSourceManager::getRawImpl("PostgreSQL-DSN", "te-benchmark-um-pq-async", true)));
|
|
|
+ }
|
|
|
+ inited = true;
|
|
|
+ }
|
|
|
+ return pool.at(pc%max);
|
|
|
+}
|
|
|
+
|
|
|
+TeBkUmLpqAsyncRouterPooled::TeBkUmLpqAsyncRouterPooled() {
|
|
|
+ maxconns = 7;
|
|
|
+ propMap props = ConfigurationData::getAppProperties();
|
|
|
+ if(props.size()>0) {
|
|
|
+ if(props.find("dbpoolsize")!=props.end()) {
|
|
|
+ try {
|
|
|
+ maxconns = CastUtil::toInt(props["dbpoolsize"]);
|
|
|
+ } catch(...) {
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ inited = false;
|
|
|
+ opt = 0;
|
|
|
+}
|
|
|
+
|
|
|
+TeBkUmLpqAsyncRouterPooled::~TeBkUmLpqAsyncRouterPooled() {
|
|
|
+ for(auto sqli: pool) {
|
|
|
+ if(sqli!=NULL) {
|
|
|
+ DataSourceManager::cleanRawImpl(sqli);
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|