| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801 |
- /*
- Copyright 2009-2020, Sumeet Chhetri
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- /*
- * TeBkUmLpqAsyncUm.cpp
- *
- * Created on: 03-Feb-2020
- * Author: sumeetc
- */
- #include "TeBkUmLpqAsync.h"
- int TeBkUmLpqAsyncWorld::getId() const {
- return id;
- }
- void TeBkUmLpqAsyncWorld::setId(int id) {
- this->id = id;
- }
- int TeBkUmLpqAsyncWorld::getRandomNumber() const {
- return randomNumber;
- }
- void TeBkUmLpqAsyncWorld::setRandomNumber(int randomNumber) {
- this->randomNumber = randomNumber;
- }
- TeBkUmLpqAsyncWorld::TeBkUmLpqAsyncWorld(int id, int randomNumber) {
- this->id = id;
- this->randomNumber = randomNumber;
- }
- TeBkUmLpqAsyncWorld::TeBkUmLpqAsyncWorld(int id) {
- this->id = id;
- randomNumber = 0;
- }
- TeBkUmLpqAsyncWorld::TeBkUmLpqAsyncWorld() {
- id = 0;
- randomNumber = 0;
- }
- TeBkUmLpqAsyncWorld::~TeBkUmLpqAsyncWorld() {
- }
- int TeBkUmLpqAsyncFortune::getId() const {
- return id;
- }
- void TeBkUmLpqAsyncFortune::setId(int id) {
- this->id = id;
- }
- TeBkUmLpqAsyncFortune::TeBkUmLpqAsyncFortune(int id) {
- this->id = id;
- allocd = false;
- }
- TeBkUmLpqAsyncFortune::TeBkUmLpqAsyncFortune(int id, std::string message) {
- this->id = id;
- this->message_i = message;
- this->message = std::string_view(this->message_i);
- allocd = false;
- }
- TeBkUmLpqAsyncFortune::TeBkUmLpqAsyncFortune() {
- id = 0;
- allocd = false;
- }
- TeBkUmLpqAsyncFortune::~TeBkUmLpqAsyncFortune() {
- if(allocd && message.size()>0) {
- free((void *)message.data());
- }
- }
- bool TeBkUmLpqAsyncFortune::operator < (const TeBkUmLpqAsyncFortune& other) const {
- return message.compare(other.message)<0;
- }
- TeBkUmLpqAsyncMessage::TeBkUmLpqAsyncMessage() {
- }
- TeBkUmLpqAsyncMessage::TeBkUmLpqAsyncMessage(std::string message) {
- this->message = message;
- }
- TeBkUmLpqAsyncMessage::~TeBkUmLpqAsyncMessage() {
- }
- const std::string& TeBkUmLpqAsyncMessage::getMessage() const {
- return message;
- }
- void TeBkUmLpqAsyncMessage::setMessage(const std::string& message) {
- this->message = message;
- }
- const std::string TeBkUmLpqAsyncRouter::HELLO_WORLD = "Hello, World!";
- const std::string TeBkUmLpqAsyncRouter::WORLD = "world";
- const std::string TeBkUmLpqAsyncRouter::WORLD_ONE_QUERY = "select id,randomnumber from world where id=$1";
- const std::string TeBkUmLpqAsyncRouter::WORLD_ALL_QUERY = "select id,randomnumber from world";
- const std::string TeBkUmLpqAsyncRouter::FORTUNE_ALL_QUERY = "select id,message from fortune";
- std::unordered_map<int, std::string> TeBkUmLpqAsyncRouter::_qC;
- int TeBkUmLpqAsyncRouter::g_seed = 0;
- void TeBkUmLpqAsyncRouter::dbAsync(BaseSocket* sif) {
- LibpqDataSourceImpl* sqli = getDb(5);
- int rid = CommonUtils::fastrand(g_seed) % 10000 + 1;
- LibpqAsyncReq* areq = sqli->getAsyncRequest();
- LibpqQuery* q = areq->getQuery();
- q->withParamInt4(rid);
- #ifdef HAVE_LIBPQ
- q->withSelectQuery(WORLD_ONE_QUERY).withContext(sif).withCb0([](void* ctx, PGresult* res) {
- BaseSocket* sif = (BaseSocket*)ctx;
- TeBkUmLpqAsyncWorld wo;
- int cols = PQnfields(res);
- for (int j = 0; j < cols; ++j) {
- if(j==0)wo.setId(ntohl(*((uint32_t *) PQgetvalue(res, 0, j))));
- else wo.setRandomNumber(ntohl(*((uint32_t *) PQgetvalue(res, 0, j))));
- }
- HttpResponse r;
- std::string h;
- #ifdef HAVE_RAPID_JSON
- rapidjson::StringBuffer s;
- rapidjson::Writer<rapidjson::StringBuffer> w(s);
- wo.toJson(w);
- r.httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, 1.1, false, s.GetSize());
- sif->writeDirect(h, s.GetString(), s.GetSize());
- #else
- JSONSerialize::serializeObject(&wo, w_ser, r.getContentP());
- r.httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON);
- sif->writeDirect(h, r.getContent());
- #endif
- sif->unUse();
- });
- #endif
- sqli->postAsync(areq);
- }
- void TeBkUmLpqAsyncRouter::queriesAsync(const char* q, int ql, BaseSocket* sif) {
- int queryCount = 0;
- CommonUtils::fastStrToNum(q, ql, queryCount);
- queryCount = std::max(1, std::min(queryCount, 500));
- LibpqDataSourceImpl* sqli = getDb(3);
- LibpqAsyncReq* areq = sqli->getAsyncRequest();
- for (int c = 0; c < queryCount; ++c) {
- int rid = CommonUtils::fastrand(g_seed) % 10000 + 1;
- LibpqQuery* q = areq->getQuery();
- q->withParamInt4(rid);
- q->withSelectQuery(WORLD_ONE_QUERY);
- }
- #ifdef HAVE_LIBPQ
- areq->withFinalCb(sif, [](void* ctx, bool status, std::vector<PGresult*>* results, const std::string& q, int counter) {
- BaseSocket* sif = (BaseSocket*)ctx;
- std::vector<TeBkUmLpqAsyncWorld> vec;
- vec.reserve((int)results->size());
- for (int i = 0; i < (int)results->size(); ++i) {
- PGresult* res = results->at(i);
- int cols = PQnfields(res);
- for (int j = 0; j < cols; ++j) {
- if(j==0) vec.emplace_back(ntohl(*((uint32_t *) PQgetvalue(res, 0, j))));
- else vec.back().setRandomNumber(ntohl(*((uint32_t *) PQgetvalue(res, 0, j))));
- }
- }
- HttpResponse r;
- std::string h;
- #ifdef HAVE_RAPID_JSON
- rapidjson::StringBuffer s;
- rapidjson::Writer<rapidjson::StringBuffer> w(s);
- TeBkUmLpqAsyncWorld::toJson(vec, w);
- r.httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, 1.1, false, s.GetSize());
- sif->writeDirect(h, s.GetString(), s.GetSize());
- #else
- JSONSerialize::serializeObjectCont(&vec, wcont_ser, "vector", r.getContentP());
- r.httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, 1.1, false);
- sif->writeDirect(h, r.getContent());
- #endif
- sif->unUse();
- });
- #endif
- sqli->postAsync(areq);
- }
- void TeBkUmLpqAsyncRouter::queriesMultiAsync(const char* q, int ql, BaseSocket* sif) {
- int queryCount = 0;
- CommonUtils::fastStrToNum(q, ql, queryCount);
- queryCount = std::max(1, std::min(queryCount, 500));
- LibpqDataSourceImpl* sqli = getDb(3);
- std::stringstream ss;
- for (int c = 0; c < queryCount; ++c) {
- int rid = CommonUtils::fastrand(g_seed) % 10000 + 1;
- ss << "select id, randomnumber from world where id = " << rid << ";";
- }
- LibpqAsyncReq* areq = sqli->getAsyncRequest();
- LibpqQuery* qu = areq->getQuery();
- qu->withSelectQuery(ss.str()).withMulti();
- #ifdef HAVE_LIBPQ
- areq->withFinalCb(sif, [](void* ctx, bool status, std::vector<PGresult*>* results, const std::string& q, int counter) {
- BaseSocket* sif = (BaseSocket*)ctx;
- std::vector<TeBkUmLpqAsyncWorld> vec;
- vec.reserve((int)results->size());
- for (int i = 0; i < (int)results->size(); ++i) {
- PGresult* res = results->at(i);
- int cols = PQnfields(res);
- for (int j = 0; j < cols; ++j) {
- int tmp = 0;
- CommonUtils::fastStrToNum(PQgetvalue(res, 0, j), PQgetlength(res, 0, j), tmp);
- if(j==0) vec.emplace_back(tmp);
- else vec.back().setRandomNumber(tmp);
- }
- }
- HttpResponse r;
- std::string h;
- #ifdef HAVE_RAPID_JSON
- rapidjson::StringBuffer s;
- rapidjson::Writer<rapidjson::StringBuffer> w(s);
- TeBkUmLpqAsyncWorld::toJson(vec, w);
- r.httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, 1.1, false, s.GetSize());
- sif->writeDirect(h, s.GetString(), s.GetSize());
- #else
- JSONSerialize::serializeObjectCont(&vec, wcont_ser, "vector", r.getContentP());
- r.httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, 1.1, false);
- sif->writeDirect(h, r.getContent());
- #endif
- sif->unUse();
- });
- #endif
- sqli->postAsync(areq, queryCount);
- }
- void TeBkUmLpqAsyncRouter::updatesMulti(const char* q, int ql, AsyncUpdatesReq* req) {
- int queryCount = 0;
- CommonUtils::fastStrToNum(q, ql, queryCount);
- queryCount = std::max(1, std::min(queryCount, 500));
- req->vec.reserve(queryCount);
- req->sqli = getDb(3);
- std::stringstream ss;
- for (int c = 0; c < queryCount; ++c) {
- int rid = CommonUtils::fastrand(g_seed) % 10000 + 1;
- ss << "select id, randomnumber from world where id = " << rid << ";";
- }
- //req->ss << "begin;";//NEVER USE - this creates a deadlock issue (like, DETAIL: Process 16 waits for ShareLock on transaction 995; blocked by process 19.)
- LibpqAsyncReq* areq = req->sqli->getAsyncRequest();
- LibpqQuery* qu = areq->getQuery();
- qu->withSelectQuery(ss.str()).withMulti();
- #ifdef HAVE_LIBPQ
- areq->withFinalCb(req, [](void* ctx, bool status, std::vector<PGresult*>* results, const std::string& q, int counter) {
- AsyncUpdatesReq* req = (AsyncUpdatesReq*)ctx;
- if(status) {
- int queryCount = (int)results->size();
- std::stringstream ss;
- for (int i = 0; i < queryCount; ++i) {
- PGresult* res = results->at(i);
- int cols = PQnfields(res);
- for (int j = 0; j < cols; ++j) {
- int tmp = 0;
- CommonUtils::fastStrToNum(PQgetvalue(res, 0, j), PQgetlength(res, 0, j), tmp);
- if(j==0) req->vec.emplace_back(tmp);
- else {
- TeBkUmLpqAsyncWorld& w = req->vec.back();
- int newRandomNumber = CommonUtils::fastrand(g_seed) % 10000 + 1;
- if(tmp == newRandomNumber) {
- newRandomNumber += 1;
- if(newRandomNumber>=10000) {
- newRandomNumber = 1;
- }
- }
- w.setRandomNumber(newRandomNumber);
- ss << "begin;update world set randomnumber = " << newRandomNumber << " where id = " << w.getId() << ";commit;";
- }
- }
- }
- LibpqAsyncReq* areq = req->sqli->getAsyncRequest();
- LibpqQuery* qu = areq->getQuery();
- qu->withUpdateQuery(ss.str()).withMulti();
- areq->withFinalCb(req, [](void* ctx, bool status, std::vector<PGresult*>* results, const std::string& q, int counter) {
- AsyncUpdatesReq* req = (AsyncUpdatesReq*)ctx;
- if(status) {
- HttpResponse r;
- std::string h;
- #ifdef HAVE_RAPID_JSON
- rapidjson::StringBuffer s;
- rapidjson::Writer<rapidjson::StringBuffer> w(s);
- TeBkUmLpqAsyncWorld::toJson(req->vec, w);
- r.httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, 1.1, false, s.GetSize());
- req->sif->writeDirect(h, s.GetString(), s.GetSize());
- #else
- JSONSerialize::serializeObjectCont(&req->vec, wcont_ser, "vector", r.getContentP());
- r.httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, 1.1, false);
- req->sif->writeDirect(h, r.getContent());
- #endif
- } else {
- HttpResponse r;
- std::string h;
- r.httpStatus(HTTPResponseStatus::InternalServerError).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, req->httpVers, true);
- req->sif->writeDirect(h);
- }
- req->sif->unUse();
- delete req;
- });
- req->sqli->postAsync(areq, queryCount*3);
- }
- });
- #endif
- req->sqli->postAsync(areq, queryCount);
- }
- std::string& TeBkUmLpqAsyncRouter::getUpdQuery(int count) {
- std::unordered_map<int, std::string>::iterator it = _qC.find(count);
- if(it!=_qC.end()) {
- return it->second;
- }
- std::stringstream ss;
- ss << "update world as t set randomnumber = case id";
- int pc = 1;
- for (int c = 0; c < count; ++c) {
- ss << " when $";
- ss << pc++;
- ss << " then $";
- ss << pc++;
- }
- ss << " else randomnumber end where id in (";
- for (int c = 0; c < count; ++c) {
- ss << "$" << pc++ << ",";
- }
- std::string q = ss.str();
- q = q.substr(0, q.length()-1);
- q += ")";
- _qC[count] = std::move(q);
- return _qC[count];
- }
- void TeBkUmLpqAsyncRouter::updatesAsyncb(const char* q, int ql, AsyncUpdatesReq* req) {
- int queryCount = 0;
- CommonUtils::fastStrToNum(q, ql, queryCount);
- queryCount = std::max(1, std::min(queryCount, 500));
- req->vec.reserve(queryCount);
- req->sqli = getDb(3);
- LibpqAsyncReq* areq = req->sqli->getAsyncRequest();
- for (int c = 0; c < queryCount; ++c) {
- int rid = CommonUtils::fastrand(g_seed) % 10000 + 1;
- LibpqQuery* q = areq->getQuery();
- q->withParamInt4(rid);
- q->withSelectQuery(WORLD_ONE_QUERY);
- }
- #ifdef HAVE_LIBPQ
- areq->withFinalCb(req, [](void* ctx, bool status, std::vector<PGresult*>* results, const std::string& query, int counter) {
- AsyncUpdatesReq* req = (AsyncUpdatesReq*)ctx;
- int queryCount = (int)results->size();
- LibpqAsyncReq* areq = req->sqli->getAsyncRequest();
- req->sqli->beginAsync(areq);
- LibpqQuery* q = areq->getQuery();
- q->withUpdateQuery(getUpdQuery(queryCount)).withContext(req);
- for (int i = 0; i < queryCount; ++i) {
- PGresult* res = results->at(i);
- int cols = PQnfields(res);
- for (int j = 0; j < cols; ++j) {
- if(j==0) req->vec.emplace_back(ntohl(*((uint32_t *) PQgetvalue(res, 0, j))));
- else {
- int tmp = ntohl(*((uint32_t *) PQgetvalue(res, 0, j)));
- TeBkUmLpqAsyncWorld& w = req->vec.back();
- int newRandomNumber = CommonUtils::fastrand(g_seed) % 10000 + 1;
- if(tmp == newRandomNumber) {
- newRandomNumber += 1;
- if(newRandomNumber>=10000) {
- newRandomNumber = 1;
- }
- }
- w.setRandomNumber(newRandomNumber);
- q->withParamInt4(w.getId());
- q->withParamInt4(w.getRandomNumber());
- }
- }
- }
- for(auto w: req->vec) {
- q->withParamInt4(w.getId());
- }
- req->sqli->commitAsync(areq);
- areq->withFinalCb(req, [](void* ctx, bool status, std::vector<PGresult*>* results, const std::string& query, int counter) {
- AsyncUpdatesReq* req = (AsyncUpdatesReq*)ctx;
- if(status) {
- HttpResponse r;
- std::string h;
- #ifdef HAVE_RAPID_JSON
- rapidjson::StringBuffer s;
- rapidjson::Writer<rapidjson::StringBuffer> w(s);
- TeBkUmLpqAsyncWorld::toJson(req->vec, w);
- r.httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, 1.1, false, s.GetSize());
- req->sif->writeDirect(h, s.GetString(), s.GetSize());
- #else
- JSONSerialize::serializeObjectCont(&req->vec, wcont_ser, "vector", r.getContentP());
- r.httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, 1.1, false);
- req->sif->writeDirect(h, r.getContent());
- #endif
- } else {
- HttpResponse r;
- std::string h;
- r.httpStatus(HTTPResponseStatus::InternalServerError).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, req->httpVers, true);
- req->sif->writeDirect(h);
- }
- req->sif->unUse();
- delete req;
- });
- req->sqli->postAsync(areq);
- });
- #endif
- req->sqli->postAsync(areq);
- }
- void TeBkUmLpqAsyncRouter::updatesAsync(const char* q, int ql, AsyncUpdatesReq* req) {
- int queryCount = 0;
- CommonUtils::fastStrToNum(q, ql, queryCount);
- queryCount = std::max(1, std::min(queryCount, 500));
- req->vec.reserve(queryCount);
- req->sqli = getDb(3);
- LibpqAsyncReq* areq = req->sqli->getAsyncRequest();
- for (int c = 0; c < queryCount; ++c) {
- int rid = CommonUtils::fastrand(g_seed) % 10000 + 1;
- LibpqQuery* qu = areq->getQuery();
- qu->withParamInt4(rid);
- qu->withSelectQuery(WORLD_ONE_QUERY);
- }
- #ifdef HAVE_LIBPQ
- areq->withFinalCb(req, [](void* ctx, bool status, std::vector<PGresult*>* results, const std::string& query, int counter) {
- AsyncUpdatesReq* req = (AsyncUpdatesReq*)ctx;
- LibpqAsyncReq* areq = req->sqli->getAsyncRequest();
- for (int i = 0; i < (int)results->size(); ++i) {
- PGresult* res = results->at(i);
- int cols = PQnfields(res);
- for (int j = 0; j < cols; ++j) {
- if(j==0) req->vec.emplace_back(ntohl(*((uint32_t *) PQgetvalue(res, 0, j))));
- else {
- int tmp = ntohl(*((uint32_t *) PQgetvalue(res, 0, j)));
- TeBkUmLpqAsyncWorld& w = req->vec.back();
- int newRandomNumber = CommonUtils::fastrand(g_seed) % 10000 + 1;
- if(tmp == newRandomNumber) {
- newRandomNumber += 1;
- if(newRandomNumber>=10000) {
- newRandomNumber = 1;
- }
- }
- w.setRandomNumber(newRandomNumber);
- std::stringstream ss;
- ss << "update world set randomnumber = " << newRandomNumber << " where id = " << w.getId();
- req->sqli->beginAsync(areq);
- LibpqQuery* q = areq->getQuery();
- q->withUpdateQuery(ss.str(), false);
- req->sqli->commitAsync(areq);
- }
- }
- }
- areq->withFinalCb(req, [](void* ctx, bool status, std::vector<PGresult*>* results, const std::string& query, int counter) {
- AsyncUpdatesReq* req = (AsyncUpdatesReq*)ctx;
- if(status) {
- HttpResponse r;
- std::string h;
- #ifdef HAVE_RAPID_JSON
- rapidjson::StringBuffer s;
- rapidjson::Writer<rapidjson::StringBuffer> w(s);
- TeBkUmLpqAsyncWorld::toJson(req->vec, w);
- r.httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, 1.1, false, s.GetSize());
- req->sif->writeDirect(h, s.GetString(), s.GetSize());
- #else
- JSONSerialize::serializeObjectCont(&req->vec, wcont_ser, "vector", r.getContentP());
- r.httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, 1.1, false);
- req->sif->writeDirect(h, r.getContent());
- #endif
- } else {
- HttpResponse r;
- std::string h;
- r.httpStatus(HTTPResponseStatus::InternalServerError).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, req->httpVers, true);
- req->sif->writeDirect(h);
- }
- req->sif->unUse();
- delete req;
- });
- req->sqli->postAsync(areq);
- });
- #endif
- req->sqli->postAsync(areq);
- }
- void TeBkUmLpqAsyncRouter::updateCache() {
- LibpqDataSourceImpl* sqli = getDb(1);
- AsyncCacheReq* req = new AsyncCacheReq;
- req->cchi = CacheManager::getImpl();
- LibpqAsyncReq* areq = sqli->getAsyncRequest();
- LibpqQuery* q = areq->getQuery();
- q->withSelectQuery(WORLD_ALL_QUERY).withContext(req).withCb3([](void* ctx, bool endofdata, int row, int col, char* value) {
- AsyncCacheReq* req = (AsyncCacheReq*)ctx;
- if(col==0) {
- req->vec.emplace_back(ntohl(*((uint32_t *) value)));
- } else {
- req->vec.back().setRandomNumber(ntohl(*((uint32_t *) value)));
- }
- if(endofdata) {
- CacheInterface* cchi = req->cchi;
- try {
- for(std::vector<TeBkUmLpqAsyncWorld>::iterator it=req->vec.begin(); it != req->vec.end(); ++it) {
- char str[12];
- sprintf(str, "%d;%d", (*it).getId(), (*it).getRandomNumber());
- cchi->setRaw((*it).getId(), str);
- }
- CacheManager::cleanImpl(cchi);
- delete req;
- CacheManager::triggerAppInitCompletion("t4");
- } catch(const std::exception& e) {
- CacheManager::cleanImpl(cchi);
- delete req;
- }
- }
- });
- sqli->postAsync(areq);
- }
- void TeBkUmLpqAsyncRouter::cachedWorlds(const char* q, int ql, std::vector<TeBkUmLpqAsyncWorld>& wlst) {
- int queryCount = 0;
- CommonUtils::fastStrToNum(q, ql, queryCount);
- queryCount = std::max(1, std::min(queryCount, 500));
- wlst.reserve(queryCount);
- CacheInterface* cchi = CacheManager::getImpl();
- std::vector<unsigned long long> keys;
- for (int c = 0; c < queryCount; ++c) {
- keys.emplace_back(CommonUtils::fastrand(g_seed) % 10000 + 1);
- }
- std::vector<std::string> values;
- cchi->getValues(keys, values);
- for (int c = 0; c < queryCount; ++c) {
- std::string& v = values.at(c);
- size_t fn = v.find(";");
- int tmp = 0;
- CommonUtils::fastStrToNum(v.substr(0, fn).c_str(), fn, tmp);
- int tmp1 = 0;
- CommonUtils::fastStrToNum(v.substr(fn+1).c_str(), v.length()-fn-1, tmp1);
- wlst.emplace_back(tmp, tmp1);
- }
- CacheManager::cleanImpl(cchi);
- }
- void TeBkUmLpqAsyncRouter::fortunes(BaseSocket* sif) {
- LibpqDataSourceImpl* sqli = getDb(7);
- LibpqAsyncReq* areq = sqli->getAsyncRequest();
- LibpqQuery* q = areq->getQuery();
- #ifdef HAVE_LIBPQ
- q->withSelectQuery(FORTUNE_ALL_QUERY).withContext(sif).withCb0([](void* ctx, PGresult* res) {
- BaseSocket* sif = (BaseSocket*)ctx;
- std::list<TeBkUmLpqAsyncFortune> flst;
- int cols = PQnfields(res);
- int rows = PQntuples(res);
- for(int i=0; i<rows; i++) {
- for (int j = 0; j < cols; ++j) {
- if(j==0) {
- flst.emplace_back(ntohl(*((uint32_t *) PQgetvalue(res, i, j))));
- } else {
- TeBkUmLpqAsyncFortune& w = flst.back();
- w.message = CryptoHandler::sanitizeHtmlFast((const uint8_t *)PQgetvalue(res, i, j), (size_t)PQgetlength(res, i, j), w.message_i, w.allocd);
- }
- }
- }
- Context context;
- flst.emplace_back(0, "Additional fortune added at request time.");
- flst.sort();
- context.emplace("fortunes", &flst);
- fcpstream str;
- tmplFunc(&context, str);
- std::string out = str.str();
- HttpResponse r;
- std::string h;
- r.httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_TEXT_HTML, 1.1, false, (int)out.length());
- sif->writeDirect(h, out);
- sif->unUse();
- });
- #endif
- sqli->postAsync(areq);
- }
- bool TeBkUmLpqAsyncRouter::route(HttpRequest* req, HttpResponse* res, BaseSocket* sif) {
- sif->use();
- if(StringUtil::endsWith(req->getPath(), "/plaint")) {
- std::string h;
- res->httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_TEXT_PLAIN, (int)HELLO_WORLD.length());
- sif->writeDirect(h, HELLO_WORLD);
- sif->unUse();
- } else if(StringUtil::endsWith(req->getPath(), "/j")) {
- TeBkUmLpqAsyncMessage msg;
- msg.setMessage(HELLO_WORLD);
- std::string h;
- #ifdef HAVE_RAPID_JSON
- rapidjson::StringBuffer s;
- rapidjson::Writer<rapidjson::StringBuffer> w(s);
- msg.toJson(w);
- res->httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, 1.1, false, s.GetSize());
- sif->writeDirect(h, s.GetString(), s.GetSize());
- #else
- JSONSerialize::serializeObject(&msg, m_ser, res->getContentP());
- res->httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON);
- sif->writeDirect(h, res->getContent());
- #endif
- sif->unUse();
- } else if(StringUtil::endsWith(req->getPath(), "/d")) {
- /*AsyncDbReq* ar = new AsyncDbReq;
- ar->sif = sif;
- ar->httpVers = req->getHttpVers();
- ar->conn_clos = req->isClose();*/
- dbAsync(sif);
- } else if(StringUtil::endsWith(req->getPath(), "/quer")) {
- struct yuarel_param params[1];
- yuarel_parse_query((char*)req->getQueryStr().data(), req->getQueryStr().size(), params, 1);
- /*AsyncQueriesReq* ar = new AsyncQueriesReq;
- ar->sif = sif;
- ar->httpVers = req->getHttpVers();
- ar->conn_clos = req->isClose();*/
- queriesAsync(params[0].val, params[0].val_len, sif);
- } else if(StringUtil::endsWith(req->getPath(), "/quem")) {
- struct yuarel_param params[1];
- yuarel_parse_query((char*)req->getQueryStr().data(), req->getQueryStr().size(), params, 1);
- /*AsyncQueriesReq* ar = new AsyncQueriesReq;
- ar->sif = sif;
- ar->httpVers = req->getHttpVers();
- ar->conn_clos = req->isClose();*/
- queriesMultiAsync(params[0].val, params[0].val_len, sif);
- } else if(StringUtil::endsWith(req->getPath(), "/updm")) {
- struct yuarel_param params[1];
- yuarel_parse_query((char*)req->getQueryStr().data(), req->getQueryStr().size(), params, 1);
- AsyncUpdatesReq* ar = new AsyncUpdatesReq;
- ar->sif = sif;
- ar->httpVers = req->getHttpVers();
- ar->conn_clos = req->isClose();
- updatesMulti(params[0].val, params[0].val_len, ar);
- } else if(StringUtil::endsWith(req->getPath(), "/fortu")) {
- /*AsyncFortuneReq* ar = new AsyncFortuneReq;
- ar->sif = sif;
- ar->httpVers = req->getHttpVers();
- ar->conn_clos = req->isClose();*/
- fortunes(sif);
- } else if(StringUtil::endsWith(req->getPath(), "/bupdt") || StringUtil::endsWith(req->getPath(), "/updt")) {
- struct yuarel_param params[1];
- yuarel_parse_query((char*)req->getQueryStr().data(), req->getQueryStr().size(), params, 1);
- AsyncUpdatesReq* ar = new AsyncUpdatesReq;
- ar->sif = sif;
- ar->httpVers = req->getHttpVers();
- ar->conn_clos = req->isClose();
- updatesAsyncb(params[0].val, params[0].val_len, ar);
- } else if(StringUtil::endsWith(req->getPath(), "/upd_")) {
- struct yuarel_param params[1];
- yuarel_parse_query((char*)req->getQueryStr().data(), req->getQueryStr().size(), params, 1);
- AsyncUpdatesReq* ar = new AsyncUpdatesReq;
- ar->sif = sif;
- ar->httpVers = req->getHttpVers();
- ar->conn_clos = req->isClose();
- updatesAsync(params[0].val, params[0].val_len, ar);
- } else if(StringUtil::endsWith(req->getPath(), "/cached-wld")) {
- struct yuarel_param params[1];
- yuarel_parse_query((char*)req->getQueryStr().data(), req->getQueryStr().size(), params, 1);
- std::vector<TeBkUmLpqAsyncWorld> vec;
- cachedWorlds(params[0].val, params[0].val_len, vec);
- std::string h;
- #ifdef HAVE_RAPID_JSON
- rapidjson::StringBuffer s;
- rapidjson::Writer<rapidjson::StringBuffer> w(s);
- TeBkUmLpqAsyncWorld::toJson(vec, w);
- res->httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, 1.1, false, s.GetSize());
- sif->writeDirect(h, s.GetString(), s.GetSize());
- #else
- JSONSerialize::serializeObjectCont(&vec, wcont_ser, "vector", res->getContentP());
- res->httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, 1.1, false);
- sif->writeDirect(h, res->getContent());
- #endif
- sif->unUse();
- } else {
- std::string h;
- res->httpStatus(HTTPResponseStatus::NotFound).generateHeadResponse(h, req->getHttpVers(), true);
- sif->writeDirect(h);
- sif->unUse();
- }
- return false;
- }
- TemplatePtr TeBkUmLpqAsyncRouter::tmplFunc;
- Ser TeBkUmLpqAsyncRouter::m_ser;
- Ser TeBkUmLpqAsyncRouter::w_ser;
- SerCont TeBkUmLpqAsyncRouter::wcont_ser;
- TeBkUmLpqAsyncRouter::TeBkUmLpqAsyncRouter() {
- sqli = NULL;
- tmplFunc = TemplateUtil::getTemplateFunc("t4", "tpe/fortunes.tpe");
- m_ser = Serializer::getSerFuncForObject("t4", "TeBkUmLpqAsyncMessage");
- w_ser = Serializer::getSerFuncForObject("t4", "TeBkUmLpqAsyncWorld");
- wcont_ser = Serializer::getSerFuncForObjectCont("t4", "TeBkUmLpqAsyncWorld", "std::vector");
- }
- TeBkUmLpqAsyncRouter::~TeBkUmLpqAsyncRouter() {
- if(sqli!=NULL) {
- DataSourceManager::cleanRawImpl(sqli);
- }
- }
- LibpqDataSourceImpl* TeBkUmLpqAsyncRouter::getDb(int max) {
- if(sqli==NULL) {
- sqli = static_cast<LibpqDataSourceImpl*>(DataSourceManager::getRawImpl("PostgreSQL-DSN", "t4"));
- }
- return sqli;
- }
- LibpqDataSourceImpl* TeBkUmLpqAsyncRouterPooled::getDb(int max) {
- if(max==0) {
- max = maxconns;
- } else {
- max = std::min(max, maxconns);
- }
- int pc = 0;
- if(inited) {
- pc = ++opt;
- if(pc>=INT_MAX-1) {
- opt = 0;
- }
- } else {
- for (int var = 0; var < maxconns; ++var) {
- pool.push_back(static_cast<LibpqDataSourceImpl*>(DataSourceManager::getRawImpl("PostgreSQL-DSN", "t4", true)));
- }
- inited = true;
- }
- return pool.at(pc%max);
- }
- TeBkUmLpqAsyncRouterPooled::TeBkUmLpqAsyncRouterPooled() {
- maxconns = 7;
- propMap props = ConfigurationData::getAppProperties();
- if(props.size()>0) {
- if(props.find("dbpoolsize")!=props.end()) {
- try {
- maxconns = CastUtil::toInt(props["dbpoolsize"]);
- } catch(...) {
- }
- }
- }
- inited = false;
- opt = 0;
- }
- TeBkUmLpqAsyncRouterPooled::~TeBkUmLpqAsyncRouterPooled() {
- for(auto sqli: pool) {
- if(sqli!=NULL) {
- DataSourceManager::cleanRawImpl(sqli);
- }
- }
- }
|