TeBkUmLpqAsync.cpp 26 KB


  1. /*
  2. Copyright 2009-2020, Sumeet Chhetri
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. /*
  14. * TeBkUmLpqAsyncUm.cpp
  15. *
  16. * Created on: 03-Feb-2020
  17. * Author: sumeetc
  18. */
  19. #include "TeBkUmLpqAsync.h"
  20. int TeBkUmLpqAsyncWorld::getId() const {
  21. return id;
  22. }
  23. void TeBkUmLpqAsyncWorld::setId(int id) {
  24. this->id = id;
  25. }
  26. int TeBkUmLpqAsyncWorld::getRandomNumber() const {
  27. return randomNumber;
  28. }
  29. void TeBkUmLpqAsyncWorld::setRandomNumber(int randomNumber) {
  30. this->randomNumber = randomNumber;
  31. }
  32. TeBkUmLpqAsyncWorld::TeBkUmLpqAsyncWorld(int id, int randomNumber) {
  33. this->id = id;
  34. this->randomNumber = randomNumber;
  35. }
  36. TeBkUmLpqAsyncWorld::TeBkUmLpqAsyncWorld(int id) {
  37. this->id = id;
  38. randomNumber = 0;
  39. }
  40. TeBkUmLpqAsyncWorld::TeBkUmLpqAsyncWorld() {
  41. id = 0;
  42. randomNumber = 0;
  43. }
  44. TeBkUmLpqAsyncWorld::~TeBkUmLpqAsyncWorld() {
  45. }
  46. int TeBkUmLpqAsyncFortune::getId() const {
  47. return id;
  48. }
  49. void TeBkUmLpqAsyncFortune::setId(int id) {
  50. this->id = id;
  51. }
  52. TeBkUmLpqAsyncFortune::TeBkUmLpqAsyncFortune(int id) {
  53. this->id = id;
  54. allocd = false;
  55. }
  56. TeBkUmLpqAsyncFortune::TeBkUmLpqAsyncFortune(int id, std::string message) {
  57. this->id = id;
  58. this->message_i = message;
  59. this->message = std::string_view(this->message_i);
  60. allocd = false;
  61. }
  62. TeBkUmLpqAsyncFortune::TeBkUmLpqAsyncFortune() {
  63. id = 0;
  64. allocd = false;
  65. }
  66. TeBkUmLpqAsyncFortune::~TeBkUmLpqAsyncFortune() {
  67. if(allocd && message.size()>0) {
  68. free((void *)message.data());
  69. }
  70. }
  71. bool TeBkUmLpqAsyncFortune::operator < (const TeBkUmLpqAsyncFortune& other) const {
  72. return message.compare(other.message)<0;
  73. }
  74. TeBkUmLpqAsyncMessage::TeBkUmLpqAsyncMessage() {
  75. }
  76. TeBkUmLpqAsyncMessage::TeBkUmLpqAsyncMessage(std::string message) {
  77. this->message = message;
  78. }
  79. TeBkUmLpqAsyncMessage::~TeBkUmLpqAsyncMessage() {
  80. }
  81. const std::string& TeBkUmLpqAsyncMessage::getMessage() const {
  82. return message;
  83. }
  84. void TeBkUmLpqAsyncMessage::setMessage(const std::string& message) {
  85. this->message = message;
  86. }
  87. const std::string TeBkUmLpqAsyncRouter::HELLO_WORLD = "Hello, World!";
  88. const std::string TeBkUmLpqAsyncRouter::WORLD = "world";
  89. const std::string TeBkUmLpqAsyncRouter::WORLD_ONE_QUERY = "select id,randomnumber from world where id=$1";
  90. const std::string TeBkUmLpqAsyncRouter::WORLD_ALL_QUERY = "select id,randomnumber from world";
  91. const std::string TeBkUmLpqAsyncRouter::FORTUNE_ALL_QUERY = "select id,message from fortune";
  92. std::unordered_map<int, std::string> TeBkUmLpqAsyncRouter::_qC;
  93. int TeBkUmLpqAsyncRouter::g_seed = 0;
  94. void TeBkUmLpqAsyncRouter::dbAsync(BaseSocket* sif) {
  95. LibpqDataSourceImpl* sqli = getDb(5);
  96. int rid = CommonUtils::fastrand(g_seed) % 10000 + 1;
  97. LibpqAsyncReq* areq = sqli->getAsyncRequest();
  98. LibpqQuery* q = areq->getQuery();
  99. q->withParamInt4(rid);
  100. #ifdef HAVE_LIBPQ
  101. q->withSelectQuery(WORLD_ONE_QUERY).withContext(sif).withCb0([](void* ctx, PGresult* res) {
  102. BaseSocket* sif = (BaseSocket*)ctx;
  103. TeBkUmLpqAsyncWorld wo;
  104. int cols = PQnfields(res);
  105. for (int j = 0; j < cols; ++j) {
  106. if(j==0)wo.setId(ntohl(*((uint32_t *) PQgetvalue(res, 0, j))));
  107. else wo.setRandomNumber(ntohl(*((uint32_t *) PQgetvalue(res, 0, j))));
  108. }
  109. HttpResponse r;
  110. std::string h;
  111. #ifdef HAVE_RAPID_JSON
  112. rapidjson::StringBuffer s;
  113. rapidjson::Writer<rapidjson::StringBuffer> w(s);
  114. wo.toJson(w);
  115. r.httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, 1.1, false, s.GetSize());
  116. sif->writeDirect(h, s.GetString(), s.GetSize());
  117. #else
  118. JSONSerialize::serializeObject(&wo, w_ser, r.getContentP());
  119. r.httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON);
  120. sif->writeDirect(h, r.getContent());
  121. #endif
  122. sif->unUse();
  123. });
  124. #endif
  125. sqli->postAsync(areq);
  126. }
  127. void TeBkUmLpqAsyncRouter::queriesAsync(const char* q, int ql, BaseSocket* sif) {
  128. int queryCount = 0;
  129. CommonUtils::fastStrToNum(q, ql, queryCount);
  130. queryCount = std::max(1, std::min(queryCount, 500));
  131. LibpqDataSourceImpl* sqli = getDb(3);
  132. LibpqAsyncReq* areq = sqli->getAsyncRequest();
  133. for (int c = 0; c < queryCount; ++c) {
  134. int rid = CommonUtils::fastrand(g_seed) % 10000 + 1;
  135. LibpqQuery* q = areq->getQuery();
  136. q->withParamInt4(rid);
  137. q->withSelectQuery(WORLD_ONE_QUERY);
  138. }
  139. #ifdef HAVE_LIBPQ
  140. areq->withFinalCb(sif, [](void* ctx, bool status, std::vector<PGresult*>* results, const std::string& q, int counter) {
  141. BaseSocket* sif = (BaseSocket*)ctx;
  142. std::vector<TeBkUmLpqAsyncWorld> vec;
  143. vec.reserve((int)results->size());
  144. for (int i = 0; i < (int)results->size(); ++i) {
  145. PGresult* res = results->at(i);
  146. int cols = PQnfields(res);
  147. for (int j = 0; j < cols; ++j) {
  148. if(j==0) vec.emplace_back(ntohl(*((uint32_t *) PQgetvalue(res, 0, j))));
  149. else vec.back().setRandomNumber(ntohl(*((uint32_t *) PQgetvalue(res, 0, j))));
  150. }
  151. }
  152. HttpResponse r;
  153. std::string h;
  154. #ifdef HAVE_RAPID_JSON
  155. rapidjson::StringBuffer s;
  156. rapidjson::Writer<rapidjson::StringBuffer> w(s);
  157. TeBkUmLpqAsyncWorld::toJson(vec, w);
  158. r.httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, 1.1, false, s.GetSize());
  159. sif->writeDirect(h, s.GetString(), s.GetSize());
  160. #else
  161. JSONSerialize::serializeObjectCont(&vec, wcont_ser, "vector", r.getContentP());
  162. r.httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, 1.1, false);
  163. sif->writeDirect(h, r.getContent());
  164. #endif
  165. sif->unUse();
  166. });
  167. #endif
  168. sqli->postAsync(areq);
  169. }
  170. void TeBkUmLpqAsyncRouter::queriesMultiAsync(const char* q, int ql, BaseSocket* sif) {
  171. int queryCount = 0;
  172. CommonUtils::fastStrToNum(q, ql, queryCount);
  173. queryCount = std::max(1, std::min(queryCount, 500));
  174. LibpqDataSourceImpl* sqli = getDb(3);
  175. std::stringstream ss;
  176. for (int c = 0; c < queryCount; ++c) {
  177. int rid = CommonUtils::fastrand(g_seed) % 10000 + 1;
  178. ss << "select id, randomnumber from world where id = " << rid << ";";
  179. }
  180. LibpqAsyncReq* areq = sqli->getAsyncRequest();
  181. LibpqQuery* qu = areq->getQuery();
  182. qu->withSelectQuery(ss.str()).withMulti();
  183. #ifdef HAVE_LIBPQ
  184. areq->withFinalCb(sif, [](void* ctx, bool status, std::vector<PGresult*>* results, const std::string& q, int counter) {
  185. BaseSocket* sif = (BaseSocket*)ctx;
  186. std::vector<TeBkUmLpqAsyncWorld> vec;
  187. vec.reserve((int)results->size());
  188. for (int i = 0; i < (int)results->size(); ++i) {
  189. PGresult* res = results->at(i);
  190. int cols = PQnfields(res);
  191. for (int j = 0; j < cols; ++j) {
  192. int tmp = 0;
  193. CommonUtils::fastStrToNum(PQgetvalue(res, 0, j), PQgetlength(res, 0, j), tmp);
  194. if(j==0) vec.emplace_back(tmp);
  195. else vec.back().setRandomNumber(tmp);
  196. }
  197. }
  198. HttpResponse r;
  199. std::string h;
  200. #ifdef HAVE_RAPID_JSON
  201. rapidjson::StringBuffer s;
  202. rapidjson::Writer<rapidjson::StringBuffer> w(s);
  203. TeBkUmLpqAsyncWorld::toJson(vec, w);
  204. r.httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, 1.1, false, s.GetSize());
  205. sif->writeDirect(h, s.GetString(), s.GetSize());
  206. #else
  207. JSONSerialize::serializeObjectCont(&vec, wcont_ser, "vector", r.getContentP());
  208. r.httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, 1.1, false);
  209. sif->writeDirect(h, r.getContent());
  210. #endif
  211. sif->unUse();
  212. });
  213. #endif
  214. sqli->postAsync(areq, queryCount);
  215. }
  216. void TeBkUmLpqAsyncRouter::updatesMulti(const char* q, int ql, AsyncUpdatesReq* req) {
  217. int queryCount = 0;
  218. CommonUtils::fastStrToNum(q, ql, queryCount);
  219. queryCount = std::max(1, std::min(queryCount, 500));
  220. req->vec.reserve(queryCount);
  221. req->sqli = getDb(3);
  222. std::stringstream ss;
  223. for (int c = 0; c < queryCount; ++c) {
  224. int rid = CommonUtils::fastrand(g_seed) % 10000 + 1;
  225. ss << "select id, randomnumber from world where id = " << rid << ";";
  226. }
  227. //req->ss << "begin;";//NEVER USE - this creates a deadlock issue (like, DETAIL: Process 16 waits for ShareLock on transaction 995; blocked by process 19.)
  228. LibpqAsyncReq* areq = req->sqli->getAsyncRequest();
  229. LibpqQuery* qu = areq->getQuery();
  230. qu->withSelectQuery(ss.str()).withMulti();
  231. #ifdef HAVE_LIBPQ
  232. areq->withFinalCb(req, [](void* ctx, bool status, std::vector<PGresult*>* results, const std::string& q, int counter) {
  233. AsyncUpdatesReq* req = (AsyncUpdatesReq*)ctx;
  234. if(status) {
  235. int queryCount = (int)results->size();
  236. std::stringstream ss;
  237. for (int i = 0; i < queryCount; ++i) {
  238. PGresult* res = results->at(i);
  239. int cols = PQnfields(res);
  240. for (int j = 0; j < cols; ++j) {
  241. int tmp = 0;
  242. CommonUtils::fastStrToNum(PQgetvalue(res, 0, j), PQgetlength(res, 0, j), tmp);
  243. if(j==0) req->vec.emplace_back(tmp);
  244. else {
  245. TeBkUmLpqAsyncWorld& w = req->vec.back();
  246. int newRandomNumber = CommonUtils::fastrand(g_seed) % 10000 + 1;
  247. if(tmp == newRandomNumber) {
  248. newRandomNumber += 1;
  249. if(newRandomNumber>=10000) {
  250. newRandomNumber = 1;
  251. }
  252. }
  253. w.setRandomNumber(newRandomNumber);
  254. ss << "begin;update world set randomnumber = " << newRandomNumber << " where id = " << w.getId() << ";commit;";
  255. }
  256. }
  257. }
  258. LibpqAsyncReq* areq = req->sqli->getAsyncRequest();
  259. LibpqQuery* qu = areq->getQuery();
  260. qu->withUpdateQuery(ss.str()).withMulti();
  261. areq->withFinalCb(req, [](void* ctx, bool status, std::vector<PGresult*>* results, const std::string& q, int counter) {
  262. AsyncUpdatesReq* req = (AsyncUpdatesReq*)ctx;
  263. if(status) {
  264. HttpResponse r;
  265. std::string h;
  266. #ifdef HAVE_RAPID_JSON
  267. rapidjson::StringBuffer s;
  268. rapidjson::Writer<rapidjson::StringBuffer> w(s);
  269. TeBkUmLpqAsyncWorld::toJson(req->vec, w);
  270. r.httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, 1.1, false, s.GetSize());
  271. req->sif->writeDirect(h, s.GetString(), s.GetSize());
  272. #else
  273. JSONSerialize::serializeObjectCont(&req->vec, wcont_ser, "vector", r.getContentP());
  274. r.httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, 1.1, false);
  275. req->sif->writeDirect(h, r.getContent());
  276. #endif
  277. } else {
  278. HttpResponse r;
  279. std::string h;
  280. r.httpStatus(HTTPResponseStatus::InternalServerError).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, req->httpVers, true);
  281. req->sif->writeDirect(h);
  282. }
  283. req->sif->unUse();
  284. delete req;
  285. });
  286. req->sqli->postAsync(areq, queryCount*3);
  287. }
  288. });
  289. #endif
  290. req->sqli->postAsync(areq, queryCount);
  291. }
  292. std::string& TeBkUmLpqAsyncRouter::getUpdQuery(int count) {
  293. std::unordered_map<int, std::string>::iterator it = _qC.find(count);
  294. if(it!=_qC.end()) {
  295. return it->second;
  296. }
  297. std::stringstream ss;
  298. ss << "update world as t set randomnumber = case id";
  299. int pc = 1;
  300. for (int c = 0; c < count; ++c) {
  301. ss << " when $";
  302. ss << pc++;
  303. ss << " then $";
  304. ss << pc++;
  305. }
  306. ss << " else randomnumber end where id in (";
  307. for (int c = 0; c < count; ++c) {
  308. ss << "$" << pc++ << ",";
  309. }
  310. std::string q = ss.str();
  311. q = q.substr(0, q.length()-1);
  312. q += ")";
  313. _qC[count] = std::move(q);
  314. return _qC[count];
  315. }
  316. void TeBkUmLpqAsyncRouter::updatesAsyncb(const char* q, int ql, AsyncUpdatesReq* req) {
  317. int queryCount = 0;
  318. CommonUtils::fastStrToNum(q, ql, queryCount);
  319. queryCount = std::max(1, std::min(queryCount, 500));
  320. req->vec.reserve(queryCount);
  321. req->sqli = getDb(3);
  322. LibpqAsyncReq* areq = req->sqli->getAsyncRequest();
  323. for (int c = 0; c < queryCount; ++c) {
  324. int rid = CommonUtils::fastrand(g_seed) % 10000 + 1;
  325. LibpqQuery* q = areq->getQuery();
  326. q->withParamInt4(rid);
  327. q->withSelectQuery(WORLD_ONE_QUERY);
  328. }
  329. #ifdef HAVE_LIBPQ
  330. areq->withFinalCb(req, [](void* ctx, bool status, std::vector<PGresult*>* results, const std::string& query, int counter) {
  331. AsyncUpdatesReq* req = (AsyncUpdatesReq*)ctx;
  332. int queryCount = (int)results->size();
  333. LibpqAsyncReq* areq = req->sqli->getAsyncRequest();
  334. req->sqli->beginAsync(areq);
  335. LibpqQuery* q = areq->getQuery();
  336. q->withUpdateQuery(getUpdQuery(queryCount)).withContext(req);
  337. for (int i = 0; i < queryCount; ++i) {
  338. PGresult* res = results->at(i);
  339. int cols = PQnfields(res);
  340. for (int j = 0; j < cols; ++j) {
  341. if(j==0) req->vec.emplace_back(ntohl(*((uint32_t *) PQgetvalue(res, 0, j))));
  342. else {
  343. int tmp = ntohl(*((uint32_t *) PQgetvalue(res, 0, j)));
  344. TeBkUmLpqAsyncWorld& w = req->vec.back();
  345. int newRandomNumber = CommonUtils::fastrand(g_seed) % 10000 + 1;
  346. if(tmp == newRandomNumber) {
  347. newRandomNumber += 1;
  348. if(newRandomNumber>=10000) {
  349. newRandomNumber = 1;
  350. }
  351. }
  352. w.setRandomNumber(newRandomNumber);
  353. q->withParamInt4(w.getId());
  354. q->withParamInt4(w.getRandomNumber());
  355. }
  356. }
  357. }
  358. for(auto w: req->vec) {
  359. q->withParamInt4(w.getId());
  360. }
  361. req->sqli->commitAsync(areq);
  362. areq->withFinalCb(req, [](void* ctx, bool status, std::vector<PGresult*>* results, const std::string& query, int counter) {
  363. AsyncUpdatesReq* req = (AsyncUpdatesReq*)ctx;
  364. if(status) {
  365. HttpResponse r;
  366. std::string h;
  367. #ifdef HAVE_RAPID_JSON
  368. rapidjson::StringBuffer s;
  369. rapidjson::Writer<rapidjson::StringBuffer> w(s);
  370. TeBkUmLpqAsyncWorld::toJson(req->vec, w);
  371. r.httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, 1.1, false, s.GetSize());
  372. req->sif->writeDirect(h, s.GetString(), s.GetSize());
  373. #else
  374. JSONSerialize::serializeObjectCont(&req->vec, wcont_ser, "vector", r.getContentP());
  375. r.httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, 1.1, false);
  376. req->sif->writeDirect(h, r.getContent());
  377. #endif
  378. } else {
  379. HttpResponse r;
  380. std::string h;
  381. r.httpStatus(HTTPResponseStatus::InternalServerError).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, req->httpVers, true);
  382. req->sif->writeDirect(h);
  383. }
  384. req->sif->unUse();
  385. delete req;
  386. });
  387. req->sqli->postAsync(areq);
  388. });
  389. #endif
  390. req->sqli->postAsync(areq);
  391. }
  392. void TeBkUmLpqAsyncRouter::updatesAsync(const char* q, int ql, AsyncUpdatesReq* req) {
  393. int queryCount = 0;
  394. CommonUtils::fastStrToNum(q, ql, queryCount);
  395. queryCount = std::max(1, std::min(queryCount, 500));
  396. req->vec.reserve(queryCount);
  397. req->sqli = getDb(3);
  398. LibpqAsyncReq* areq = req->sqli->getAsyncRequest();
  399. for (int c = 0; c < queryCount; ++c) {
  400. int rid = CommonUtils::fastrand(g_seed) % 10000 + 1;
  401. LibpqQuery* qu = areq->getQuery();
  402. qu->withParamInt4(rid);
  403. qu->withSelectQuery(WORLD_ONE_QUERY);
  404. }
  405. #ifdef HAVE_LIBPQ
  406. areq->withFinalCb(req, [](void* ctx, bool status, std::vector<PGresult*>* results, const std::string& query, int counter) {
  407. AsyncUpdatesReq* req = (AsyncUpdatesReq*)ctx;
  408. LibpqAsyncReq* areq = req->sqli->getAsyncRequest();
  409. for (int i = 0; i < (int)results->size(); ++i) {
  410. PGresult* res = results->at(i);
  411. int cols = PQnfields(res);
  412. for (int j = 0; j < cols; ++j) {
  413. if(j==0) req->vec.emplace_back(ntohl(*((uint32_t *) PQgetvalue(res, 0, j))));
  414. else {
  415. int tmp = ntohl(*((uint32_t *) PQgetvalue(res, 0, j)));
  416. TeBkUmLpqAsyncWorld& w = req->vec.back();
  417. int newRandomNumber = CommonUtils::fastrand(g_seed) % 10000 + 1;
  418. if(tmp == newRandomNumber) {
  419. newRandomNumber += 1;
  420. if(newRandomNumber>=10000) {
  421. newRandomNumber = 1;
  422. }
  423. }
  424. w.setRandomNumber(newRandomNumber);
  425. std::stringstream ss;
  426. ss << "update world set randomnumber = " << newRandomNumber << " where id = " << w.getId();
  427. req->sqli->beginAsync(areq);
  428. LibpqQuery* q = areq->getQuery();
  429. q->withUpdateQuery(ss.str(), false);
  430. req->sqli->commitAsync(areq);
  431. }
  432. }
  433. }
  434. areq->withFinalCb(req, [](void* ctx, bool status, std::vector<PGresult*>* results, const std::string& query, int counter) {
  435. AsyncUpdatesReq* req = (AsyncUpdatesReq*)ctx;
  436. if(status) {
  437. HttpResponse r;
  438. std::string h;
  439. #ifdef HAVE_RAPID_JSON
  440. rapidjson::StringBuffer s;
  441. rapidjson::Writer<rapidjson::StringBuffer> w(s);
  442. TeBkUmLpqAsyncWorld::toJson(req->vec, w);
  443. r.httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, 1.1, false, s.GetSize());
  444. req->sif->writeDirect(h, s.GetString(), s.GetSize());
  445. #else
  446. JSONSerialize::serializeObjectCont(&req->vec, wcont_ser, "vector", r.getContentP());
  447. r.httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, 1.1, false);
  448. req->sif->writeDirect(h, r.getContent());
  449. #endif
  450. } else {
  451. HttpResponse r;
  452. std::string h;
  453. r.httpStatus(HTTPResponseStatus::InternalServerError).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, req->httpVers, true);
  454. req->sif->writeDirect(h);
  455. }
  456. req->sif->unUse();
  457. delete req;
  458. });
  459. req->sqli->postAsync(areq);
  460. });
  461. #endif
  462. req->sqli->postAsync(areq);
  463. }
  464. void TeBkUmLpqAsyncRouter::updateCache() {
  465. LibpqDataSourceImpl* sqli = getDb(1);
  466. AsyncCacheReq* req = new AsyncCacheReq;
  467. req->cchi = CacheManager::getImpl();
  468. LibpqAsyncReq* areq = sqli->getAsyncRequest();
  469. LibpqQuery* q = areq->getQuery();
  470. q->withSelectQuery(WORLD_ALL_QUERY).withContext(req).withCb3([](void* ctx, bool endofdata, int row, int col, char* value) {
  471. AsyncCacheReq* req = (AsyncCacheReq*)ctx;
  472. if(col==0) {
  473. req->vec.emplace_back(ntohl(*((uint32_t *) value)));
  474. } else {
  475. req->vec.back().setRandomNumber(ntohl(*((uint32_t *) value)));
  476. }
  477. if(endofdata) {
  478. CacheInterface* cchi = req->cchi;
  479. try {
  480. for(std::vector<TeBkUmLpqAsyncWorld>::iterator it=req->vec.begin(); it != req->vec.end(); ++it) {
  481. char str[12];
  482. sprintf(str, "%d;%d", (*it).getId(), (*it).getRandomNumber());
  483. cchi->setRaw((*it).getId(), str);
  484. }
  485. CacheManager::cleanImpl(cchi);
  486. delete req;
  487. CacheManager::triggerAppInitCompletion("t4");
  488. } catch(const std::exception& e) {
  489. CacheManager::cleanImpl(cchi);
  490. delete req;
  491. }
  492. }
  493. });
  494. sqli->postAsync(areq);
  495. }
  496. void TeBkUmLpqAsyncRouter::cachedWorlds(const char* q, int ql, std::vector<TeBkUmLpqAsyncWorld>& wlst) {
  497. int queryCount = 0;
  498. CommonUtils::fastStrToNum(q, ql, queryCount);
  499. queryCount = std::max(1, std::min(queryCount, 500));
  500. wlst.reserve(queryCount);
  501. CacheInterface* cchi = CacheManager::getImpl();
  502. std::vector<unsigned long long> keys;
  503. for (int c = 0; c < queryCount; ++c) {
  504. keys.emplace_back(CommonUtils::fastrand(g_seed) % 10000 + 1);
  505. }
  506. std::vector<std::string> values;
  507. cchi->getValues(keys, values);
  508. for (int c = 0; c < queryCount; ++c) {
  509. std::string& v = values.at(c);
  510. size_t fn = v.find(";");
  511. int tmp = 0;
  512. CommonUtils::fastStrToNum(v.substr(0, fn).c_str(), fn, tmp);
  513. int tmp1 = 0;
  514. CommonUtils::fastStrToNum(v.substr(fn+1).c_str(), v.length()-fn-1, tmp1);
  515. wlst.emplace_back(tmp, tmp1);
  516. }
  517. CacheManager::cleanImpl(cchi);
  518. }
  519. void TeBkUmLpqAsyncRouter::fortunes(BaseSocket* sif) {
  520. LibpqDataSourceImpl* sqli = getDb(7);
  521. LibpqAsyncReq* areq = sqli->getAsyncRequest();
  522. LibpqQuery* q = areq->getQuery();
  523. #ifdef HAVE_LIBPQ
  524. q->withSelectQuery(FORTUNE_ALL_QUERY).withContext(sif).withCb0([](void* ctx, PGresult* res) {
  525. BaseSocket* sif = (BaseSocket*)ctx;
  526. std::list<TeBkUmLpqAsyncFortune> flst;
  527. int cols = PQnfields(res);
  528. int rows = PQntuples(res);
  529. for(int i=0; i<rows; i++) {
  530. for (int j = 0; j < cols; ++j) {
  531. if(j==0) {
  532. flst.emplace_back(ntohl(*((uint32_t *) PQgetvalue(res, i, j))));
  533. } else {
  534. TeBkUmLpqAsyncFortune& w = flst.back();
  535. w.message = CryptoHandler::sanitizeHtmlFast((const uint8_t *)PQgetvalue(res, i, j), (size_t)PQgetlength(res, i, j), w.message_i, w.allocd);
  536. }
  537. }
  538. }
  539. Context context;
  540. flst.emplace_back(0, "Additional fortune added at request time.");
  541. flst.sort();
  542. context.emplace("fortunes", &flst);
  543. fcpstream str;
  544. tmplFunc(&context, str);
  545. std::string out = str.str();
  546. HttpResponse r;
  547. std::string h;
  548. r.httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_TEXT_HTML, 1.1, false, (int)out.length());
  549. sif->writeDirect(h, out);
  550. sif->unUse();
  551. });
  552. #endif
  553. sqli->postAsync(areq);
  554. }
  555. bool TeBkUmLpqAsyncRouter::route(HttpRequest* req, HttpResponse* res, BaseSocket* sif) {
  556. sif->use();
  557. if(StringUtil::endsWith(req->getPath(), "/plaint")) {
  558. std::string h;
  559. res->httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_TEXT_PLAIN, (int)HELLO_WORLD.length());
  560. sif->writeDirect(h, HELLO_WORLD);
  561. sif->unUse();
  562. } else if(StringUtil::endsWith(req->getPath(), "/j")) {
  563. TeBkUmLpqAsyncMessage msg;
  564. msg.setMessage(HELLO_WORLD);
  565. std::string h;
  566. #ifdef HAVE_RAPID_JSON
  567. rapidjson::StringBuffer s;
  568. rapidjson::Writer<rapidjson::StringBuffer> w(s);
  569. msg.toJson(w);
  570. res->httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, 1.1, false, s.GetSize());
  571. sif->writeDirect(h, s.GetString(), s.GetSize());
  572. #else
  573. JSONSerialize::serializeObject(&msg, m_ser, res->getContentP());
  574. res->httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON);
  575. sif->writeDirect(h, res->getContent());
  576. #endif
  577. sif->unUse();
  578. } else if(StringUtil::endsWith(req->getPath(), "/d")) {
  579. /*AsyncDbReq* ar = new AsyncDbReq;
  580. ar->sif = sif;
  581. ar->httpVers = req->getHttpVers();
  582. ar->conn_clos = req->isClose();*/
  583. dbAsync(sif);
  584. } else if(StringUtil::endsWith(req->getPath(), "/quer")) {
  585. struct yuarel_param params[1];
  586. yuarel_parse_query((char*)req->getQueryStr().data(), req->getQueryStr().size(), params, 1);
  587. /*AsyncQueriesReq* ar = new AsyncQueriesReq;
  588. ar->sif = sif;
  589. ar->httpVers = req->getHttpVers();
  590. ar->conn_clos = req->isClose();*/
  591. queriesAsync(params[0].val, params[0].val_len, sif);
  592. } else if(StringUtil::endsWith(req->getPath(), "/quem")) {
  593. struct yuarel_param params[1];
  594. yuarel_parse_query((char*)req->getQueryStr().data(), req->getQueryStr().size(), params, 1);
  595. /*AsyncQueriesReq* ar = new AsyncQueriesReq;
  596. ar->sif = sif;
  597. ar->httpVers = req->getHttpVers();
  598. ar->conn_clos = req->isClose();*/
  599. queriesMultiAsync(params[0].val, params[0].val_len, sif);
  600. } else if(StringUtil::endsWith(req->getPath(), "/updm")) {
  601. struct yuarel_param params[1];
  602. yuarel_parse_query((char*)req->getQueryStr().data(), req->getQueryStr().size(), params, 1);
  603. AsyncUpdatesReq* ar = new AsyncUpdatesReq;
  604. ar->sif = sif;
  605. ar->httpVers = req->getHttpVers();
  606. ar->conn_clos = req->isClose();
  607. updatesMulti(params[0].val, params[0].val_len, ar);
  608. } else if(StringUtil::endsWith(req->getPath(), "/fortu")) {
  609. /*AsyncFortuneReq* ar = new AsyncFortuneReq;
  610. ar->sif = sif;
  611. ar->httpVers = req->getHttpVers();
  612. ar->conn_clos = req->isClose();*/
  613. fortunes(sif);
  614. } else if(StringUtil::endsWith(req->getPath(), "/bupdt") || StringUtil::endsWith(req->getPath(), "/updt")) {
  615. struct yuarel_param params[1];
  616. yuarel_parse_query((char*)req->getQueryStr().data(), req->getQueryStr().size(), params, 1);
  617. AsyncUpdatesReq* ar = new AsyncUpdatesReq;
  618. ar->sif = sif;
  619. ar->httpVers = req->getHttpVers();
  620. ar->conn_clos = req->isClose();
  621. updatesAsyncb(params[0].val, params[0].val_len, ar);
  622. } else if(StringUtil::endsWith(req->getPath(), "/upd_")) {
  623. struct yuarel_param params[1];
  624. yuarel_parse_query((char*)req->getQueryStr().data(), req->getQueryStr().size(), params, 1);
  625. AsyncUpdatesReq* ar = new AsyncUpdatesReq;
  626. ar->sif = sif;
  627. ar->httpVers = req->getHttpVers();
  628. ar->conn_clos = req->isClose();
  629. updatesAsync(params[0].val, params[0].val_len, ar);
  630. } else if(StringUtil::endsWith(req->getPath(), "/cached-wld")) {
  631. struct yuarel_param params[1];
  632. yuarel_parse_query((char*)req->getQueryStr().data(), req->getQueryStr().size(), params, 1);
  633. std::vector<TeBkUmLpqAsyncWorld> vec;
  634. cachedWorlds(params[0].val, params[0].val_len, vec);
  635. std::string h;
  636. #ifdef HAVE_RAPID_JSON
  637. rapidjson::StringBuffer s;
  638. rapidjson::Writer<rapidjson::StringBuffer> w(s);
  639. TeBkUmLpqAsyncWorld::toJson(vec, w);
  640. res->httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, 1.1, false, s.GetSize());
  641. sif->writeDirect(h, s.GetString(), s.GetSize());
  642. #else
  643. JSONSerialize::serializeObjectCont(&vec, wcont_ser, "vector", res->getContentP());
  644. res->httpStatus(HTTPResponseStatus::Ok).generateHeadResponse(h, ContentTypes::CONTENT_TYPE_APPLICATION_JSON, 1.1, false);
  645. sif->writeDirect(h, res->getContent());
  646. #endif
  647. sif->unUse();
  648. } else {
  649. std::string h;
  650. res->httpStatus(HTTPResponseStatus::NotFound).generateHeadResponse(h, req->getHttpVers(), true);
  651. sif->writeDirect(h);
  652. sif->unUse();
  653. }
  654. return false;
  655. }
  656. TemplatePtr TeBkUmLpqAsyncRouter::tmplFunc;
  657. Ser TeBkUmLpqAsyncRouter::m_ser;
  658. Ser TeBkUmLpqAsyncRouter::w_ser;
  659. SerCont TeBkUmLpqAsyncRouter::wcont_ser;
  660. TeBkUmLpqAsyncRouter::TeBkUmLpqAsyncRouter() {
  661. sqli = NULL;
  662. tmplFunc = TemplateUtil::getTemplateFunc("t4", "tpe/fortunes.tpe");
  663. m_ser = Serializer::getSerFuncForObject("t4", "TeBkUmLpqAsyncMessage");
  664. w_ser = Serializer::getSerFuncForObject("t4", "TeBkUmLpqAsyncWorld");
  665. wcont_ser = Serializer::getSerFuncForObjectCont("t4", "TeBkUmLpqAsyncWorld", "std::vector");
  666. }
  667. TeBkUmLpqAsyncRouter::~TeBkUmLpqAsyncRouter() {
  668. if(sqli!=NULL) {
  669. DataSourceManager::cleanRawImpl(sqli);
  670. }
  671. }
  672. LibpqDataSourceImpl* TeBkUmLpqAsyncRouter::getDb(int max) {
  673. if(sqli==NULL) {
  674. sqli = static_cast<LibpqDataSourceImpl*>(DataSourceManager::getRawImpl("PostgreSQL-DSN", "t4"));
  675. }
  676. return sqli;
  677. }
  678. LibpqDataSourceImpl* TeBkUmLpqAsyncRouterPooled::getDb(int max) {
  679. if(max==0) {
  680. max = maxconns;
  681. } else {
  682. max = std::min(max, maxconns);
  683. }
  684. int pc = 0;
  685. if(inited) {
  686. pc = ++opt;
  687. if(pc>=INT_MAX-1) {
  688. opt = 0;
  689. }
  690. } else {
  691. for (int var = 0; var < maxconns; ++var) {
  692. pool.push_back(static_cast<LibpqDataSourceImpl*>(DataSourceManager::getRawImpl("PostgreSQL-DSN", "t4", true)));
  693. }
  694. inited = true;
  695. }
  696. return pool.at(pc%max);
  697. }
  698. TeBkUmLpqAsyncRouterPooled::TeBkUmLpqAsyncRouterPooled() {
  699. maxconns = 7;
  700. propMap props = ConfigurationData::getAppProperties();
  701. if(props.size()>0) {
  702. if(props.find("dbpoolsize")!=props.end()) {
  703. try {
  704. maxconns = CastUtil::toInt(props["dbpoolsize"]);
  705. } catch(...) {
  706. }
  707. }
  708. }
  709. inited = false;
  710. opt = 0;
  711. }
  712. TeBkUmLpqAsyncRouterPooled::~TeBkUmLpqAsyncRouterPooled() {
  713. for(auto sqli: pool) {
  714. if(sqli!=NULL) {
  715. DataSourceManager::cleanRawImpl(sqli);
  716. }
  717. }
  718. }