main.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. #include <boost/beast/core.hpp>
  2. #include <boost/beast/http.hpp>
  3. #include <boost/beast/version.hpp>
  4. #include <boost/asio/ip/tcp.hpp>
  5. #include <boost/asio/awaitable.hpp>
  6. #include <boost/asio/co_spawn.hpp>
  7. #include <boost/asio/use_awaitable.hpp>
  8. #include <boost/config.hpp>
  9. #include <boost/json/src.hpp>
  10. #include <algorithm>
  11. #include <cstdlib>
  12. #include <ctime>
  13. #include <iostream>
  14. #include <memory>
  15. #include <string>
  16. #include <thread>
  17. #include <vector>
  18. #include <random>
  19. #if defined(BOOST_ASIO_HAS_CO_AWAIT)
  20. #include <libpq-fe.h>
  21. namespace beast = boost::beast; // from <boost/beast.hpp>
  22. namespace http = beast::http; // from <boost/beast/http.hpp>
  23. namespace net = boost::asio; // from <boost/asio.hpp>
  24. using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
  25. namespace json = boost::json; // from <boost/json.hpp>
  26. using tcp_stream = typename beast::tcp_stream::rebind_executor<
  27. net::use_awaitable_t<>::executor_with_default<net::any_io_executor>>::other;
  28. namespace becpp
  29. {
  30. using result_ptr = std::unique_ptr<PGresult, decltype(&PQclear)>;
  31. //https://gist.github.com/ictlyh/12fe787ec265b33fd7e4b0bd08bc27cb
  32. result_ptr prepare(PGconn* conn,
  33. const char* stmtName,
  34. const char* command,
  35. uint8_t nParams)
  36. {
  37. auto res = PQprepare(conn, stmtName, command, nParams, nullptr);
  38. if (PQresultStatus(res) != PGRES_COMMAND_OK)
  39. {
  40. std::cerr << "PQprepare failed: " << PQresultErrorMessage(res)
  41. << std::endl;
  42. PQclear(res);
  43. return {nullptr, nullptr};
  44. }
  45. return {res, &PQclear};
  46. }
  47. result_ptr execute(PGconn* conn,
  48. const char* stmtName,
  49. uint8_t nParams,
  50. const char* const* paramValues,
  51. const int* paramLengths = nullptr,
  52. const int* paramFormats = nullptr,
  53. int resultFormat = 0)
  54. {
  55. auto res = PQexecPrepared(conn, stmtName, nParams, paramValues, paramLengths,
  56. paramFormats, resultFormat);
  57. const auto status = PQresultStatus(res);
  58. if (status != PGRES_COMMAND_OK &&
  59. status != PGRES_TUPLES_OK &&
  60. status != PGRES_SINGLE_TUPLE)
  61. {
  62. std::cerr << "PQexecPrepared failed: " << PQresultErrorMessage(res)
  63. << std::endl;
  64. PQclear(res);
  65. return {nullptr, nullptr};
  66. }
  67. return {res, &PQclear};
  68. }
  69. const char* env(const char* env_var, const char* default_value)
  70. {
  71. if (const char* env_p = std::getenv(env_var))
  72. return env_p;
  73. return default_value;
  74. }
  75. std::string now_string()
  76. {
  77. std::time_t time = std::time(nullptr);
  78. char timeString[std::size("Wed, 17 Apr 2013 12:00:00 GMT")];
  79. std::strftime(std::data(timeString), std::size(timeString), "%a, %d %b %Y %X %Z", std::localtime(&time));
  80. return timeString;
  81. }
  82. template <class Body, class Allocator>
  83. http::message_generator
  84. handle_error(
  85. http::request<Body, http::basic_fields<Allocator>>&& req,
  86. http::status status,
  87. beast::string_view msg)
  88. {
  89. http::response<http::string_body> res{status, req.version()};
  90. res.set(http::field::server, BOOST_BEAST_VERSION_STRING);
  91. res.set(http::field::content_type, "text/html");
  92. res.set(http::field::date, now_string());
  93. res.keep_alive(req.keep_alive());
  94. res.body() = std::string(msg);
  95. res.prepare_payload();
  96. return res;
  97. }
  98. template <class Body, class Allocator>
  99. http::message_generator
  100. handle_target(
  101. http::request<Body, http::basic_fields<Allocator>>&& req,
  102. PGconn* conn = nullptr)
  103. {
  104. //std::cout << "handle_target: " << req.target() << std::endl;
  105. http::response<http::string_body> res{http::status::ok, req.version()};
  106. res.set(http::field::server, BOOST_BEAST_VERSION_STRING);
  107. res.set(http::field::content_type, "application/json");
  108. res.set(http::field::date, now_string());
  109. res.keep_alive(req.keep_alive());
  110. if (req.target() == "/json")
  111. {
  112. // {"message":"Hello, World!"}
  113. json::object obj;
  114. obj["message"] = "Hello, World!";
  115. res.body() = json::serialize(obj);
  116. }
  117. else if (req.target() == "/plaintext")
  118. {
  119. res.set(http::field::content_type, "text/plain");
  120. res.body() = "Hello, World!";
  121. }
  122. else if (req.target() == "/db" || req.target().starts_with("/queries/"))
  123. {
  124. static std::random_device rd;
  125. static std::minstd_rand gen(rd());
  126. static std::uniform_int_distribution<> distrib(1, 10000);
  127. static const char* word_query = "SELECT randomNumber FROM world WHERE id=$1";
  128. thread_local auto stmt = prepare(conn, "word_query_stmt", word_query, 1);
  129. if (req.target() == "/db")
  130. {
  131. const unsigned uint_id = distrib(gen);
  132. const auto str_id = std::to_string(uint_id);
  133. const char* char_ptr_id = str_id.c_str();
  134. if (auto rs = execute(conn, "word_query_stmt", 1, &char_ptr_id))
  135. {
  136. // {"id":3217,"randomNumber":2149}
  137. json::object obj;
  138. obj["id"] = uint_id;
  139. obj["randomNumber"] = std::atoi(PQgetvalue(rs.get(), 0, 0));
  140. res.body() = json::serialize(obj);
  141. //std::cout << "res.body(): " << res.body() << std::endl;
  142. }
  143. else
  144. return handle_error(std::move(req),
  145. http::status::internal_server_error,
  146. "internal_server_error");
  147. }
  148. else if (req.target().starts_with("/queries/"))
  149. {
  150. int n_queries = 1;
  151. try
  152. {
  153. const int n = std::stoi(req.target().substr(req.target().find_last_of('/')+1));
  154. if (n > 1) n_queries = n;
  155. } catch(...) {}
  156. if (n_queries > 500) n_queries = 500;
  157. json::array objs;
  158. for (auto i = 0; i < n_queries; ++i)
  159. {
  160. const unsigned uint_id = distrib(gen);
  161. const auto str_id = std::to_string(uint_id);
  162. const char* char_ptr_id = str_id.c_str();
  163. if (auto rs = execute(conn, "word_query_stmt", 1, &char_ptr_id))
  164. {
  165. // {"id":3217,"randomNumber":2149}
  166. json::object obj;
  167. obj["id"] = uint_id;
  168. obj["randomNumber"] = std::atoi(PQgetvalue(rs.get(), 0, 0));
  169. objs.push_back(obj);
  170. }
  171. else
  172. return handle_error(std::move(req),
  173. http::status::internal_server_error,
  174. "internal_server_error");
  175. }
  176. res.body() = json::serialize(objs);
  177. //std::cout << "res.body(): " << res.body() << std::endl;
  178. }
  179. }
  180. else
  181. {
  182. return handle_error(std::move(req),
  183. http::status::not_found,
  184. "Unhandled target: '" + std::string(req.target()) + "'");
  185. }
  186. res.prepare_payload();
  187. return res;
  188. }
  189. // Return a response for the given request.
  190. //
  191. // The concrete type of the response message (which depends on the
  192. // request), is type-erased in message_generator.
  193. template <class Body, class Allocator>
  194. http::message_generator
  195. handle_request(
  196. http::request<Body, http::basic_fields<Allocator>>&& req)
  197. {
  198. // Make sure we can handle the method
  199. if (req.method() != http::verb::get)
  200. return handle_error(std::move(req),
  201. http::status::not_found,
  202. "Unhandled method: '" + std::string(req.method_string()) + "'");
  203. if (req.target() == "/json" || req.target() == "/plaintext")
  204. return handle_target(std::move(req));
  205. thread_local std::unique_ptr<PGconn, decltype(&PQfinish)> conn
  206. {
  207. PQconnectdb(env("BCPP_PG_CONN_STR", "")),
  208. &PQfinish
  209. };
  210. if (PQstatus(conn.get()) != CONNECTION_OK)
  211. {
  212. auto msg = PQerrorMessage(conn.get());
  213. std::cerr << "PQerrorMessage: " << msg << std::endl;
  214. return handle_error(std::move(req),
  215. http::status::internal_server_error,
  216. msg);
  217. }
  218. return handle_target(std::move(req), conn.get());
  219. }
  220. }
  221. //------------------------------------------------------------------------------
  222. // Handles an HTTP server connection
  223. net::awaitable<void>
  224. do_session(tcp_stream stream)
  225. {
  226. // This buffer is required to persist across reads
  227. beast::flat_buffer buffer;
  228. // This lambda is used to send messages
  229. try
  230. {
  231. for(;;)
  232. {
  233. // Set the timeout.
  234. stream.expires_after(std::chrono::seconds(30));
  235. // Read a request
  236. http::request<http::string_body> req;
  237. co_await http::async_read(stream, buffer, req);
  238. // Handle the request
  239. http::message_generator msg =
  240. becpp::handle_request(std::move(req));
  241. // Determine if we should close the connection
  242. bool keep_alive = msg.keep_alive();
  243. // Send the response
  244. co_await beast::async_write(stream, std::move(msg), net::use_awaitable);
  245. if(! keep_alive)
  246. {
  247. // This means we should close the connection, usually because
  248. // the response indicated the "Connection: close" semantic.
  249. break;
  250. }
  251. }
  252. }
  253. catch (boost::system::system_error & se)
  254. {
  255. if (se.code() != http::error::end_of_stream )
  256. throw ;
  257. }
  258. // Send a TCP shutdown
  259. beast::error_code ec;
  260. stream.socket().shutdown(tcp::socket::shutdown_send, ec);
  261. // At this point the connection is closed gracefully
  262. // we ignore the error because the client might have
  263. // dropped the connection already.
  264. }
  265. //------------------------------------------------------------------------------
  266. // Accepts incoming connections and launches the sessions
  267. net::awaitable<void>
  268. do_listen(tcp::endpoint endpoint)
  269. {
  270. // Open the acceptor
  271. auto acceptor = net::use_awaitable.as_default_on(tcp::acceptor(co_await net::this_coro::executor));
  272. acceptor.open(endpoint.protocol());
  273. // Allow address reuse
  274. acceptor.set_option(net::socket_base::reuse_address(true));
  275. // Bind to the server address
  276. acceptor.bind(endpoint);
  277. // Start listening for connections
  278. acceptor.listen(net::socket_base::max_listen_connections);
  279. for(;;)
  280. boost::asio::co_spawn(
  281. acceptor.get_executor(),
  282. do_session(tcp_stream(co_await acceptor.async_accept())),
  283. [](std::exception_ptr e)
  284. {
  285. if (e)
  286. try
  287. {
  288. std::rethrow_exception(e);
  289. }
  290. catch (std::exception&){}
  291. // catch (std::exception &e) {
  292. // std::cerr << "Error in session: " << e.what() << "\n";
  293. // }
  294. });
  295. }
  296. int main(int argc, char* argv[])
  297. {
  298. auto const address = net::ip::make_address(becpp::env("BCPP_ADDRESS", "0.0.0.0"));
  299. auto const port = static_cast<unsigned short>(std::atoi(becpp::env("BCPP_PORT", "8000")));
  300. auto env_threads = std::atoi(becpp::env("BCPP_N_THREADS", "0"));
  301. if (env_threads == 0)
  302. {
  303. env_threads = std::thread::hardware_concurrency();
  304. std::cout << "Using number of cores: " << env_threads << '\n';
  305. }
  306. auto const threads = std::max<int>(1, env_threads);
  307. std::cout << "__GNUG__=" << __GNUG__ << '\n';
  308. std::cout << "__cplusplus=" << __cplusplus << '\n';
  309. std::cout << "__TIMESTAMP__=" << __TIMESTAMP__ << '\n';
  310. std::cout << "__GNUC_EXECUTION_CHARSET_NAME=" << __GNUC_EXECUTION_CHARSET_NAME << '\n';
  311. std::cout << "Listening " << address << ':' << port << " threads=" << threads << std::endl;
  312. // The io_context is required for all I/O
  313. net::io_context ioc{threads};
  314. // Spawn a listening port
  315. boost::asio::co_spawn(ioc,
  316. do_listen(tcp::endpoint{address, port}),
  317. [](std::exception_ptr e)
  318. {
  319. if (e)
  320. try
  321. {
  322. std::rethrow_exception(e);
  323. }
  324. catch(std::exception & e)
  325. {
  326. std::cerr << "Error in acceptor: " << e.what() << "\n";
  327. }
  328. });
  329. // Run the I/O service on the requested number of threads
  330. try {
  331. std::vector<std::thread> v;
  332. v.reserve(threads - 1);
  333. for(auto i = threads - 1; i > 0; --i)
  334. v.emplace_back(
  335. [&ioc]
  336. {
  337. ioc.run();
  338. });
  339. ioc.run();
  340. } catch (std::exception& e)
  341. {
  342. std::cerr << "Error in main: " << e.what() << "\n";
  343. }
  344. return EXIT_SUCCESS;
  345. }
  346. #else
  347. int main(int, char * [])
  348. {
  349. std::printf("awaitables require C++20\n");
  350. return 1;
  351. }
  352. #endif