main.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396
  1. #include <boost/beast/core.hpp>
  2. #include <boost/beast/http.hpp>
  3. #include <boost/beast/version.hpp>
  4. #include <boost/asio/ip/tcp.hpp>
  5. #include <boost/asio/awaitable.hpp>
  6. #include <boost/asio/co_spawn.hpp>
  7. #include <boost/asio/use_awaitable.hpp>
  8. #include <boost/config.hpp>
  9. #include <boost/json/src.hpp>
  10. #include <algorithm>
  11. #include <cstdlib>
  12. #include <ctime>
  13. #include <iostream>
  14. #include <memory>
  15. #include <string>
  16. #include <thread>
  17. #include <vector>
  18. #include <random>
  19. #if defined(BOOST_ASIO_HAS_CO_AWAIT)
  20. #include <libpq-fe.h>
  21. namespace beast = boost::beast; // from <boost/beast.hpp>
  22. namespace http = beast::http; // from <boost/beast/http.hpp>
  23. namespace net = boost::asio; // from <boost/asio.hpp>
  24. using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
  25. namespace json = boost::json; // from <boost/json.hpp>
  26. using tcp_stream = typename beast::tcp_stream::rebind_executor<
  27. net::use_awaitable_t<>::executor_with_default<net::any_io_executor>>::other;
  28. namespace becpp
  29. {
  30. using result_ptr = std::unique_ptr<PGresult, decltype(&PQclear)>;
  31. //https://gist.github.com/ictlyh/12fe787ec265b33fd7e4b0bd08bc27cb
  32. result_ptr prepare(PGconn* conn,
  33. const char* stmtName,
  34. const char* command,
  35. uint8_t nParams)
  36. {
  37. auto res = PQprepare(conn, stmtName, command, nParams, nullptr);
  38. if (PQresultStatus(res) != PGRES_COMMAND_OK)
  39. {
  40. std::cerr << "PQprepare failed: " << PQresultErrorMessage(res)
  41. << std::endl;
  42. PQclear(res);
  43. return {nullptr, nullptr};
  44. }
  45. return {res, &PQclear};
  46. }
  47. result_ptr execute(PGconn* conn,
  48. const char* stmtName,
  49. uint8_t nParams,
  50. const char* const* paramValues,
  51. const int* paramLengths = nullptr,
  52. const int* paramFormats = nullptr,
  53. int resultFormat = 0)
  54. {
  55. auto res = PQexecPrepared(conn, stmtName, nParams, paramValues, paramLengths,
  56. paramFormats, resultFormat);
  57. const auto status = PQresultStatus(res);
  58. if (status != PGRES_COMMAND_OK &&
  59. status != PGRES_TUPLES_OK &&
  60. status != PGRES_SINGLE_TUPLE)
  61. {
  62. std::cerr << "PQexecPrepared failed: " << PQresultErrorMessage(res)
  63. << std::endl;
  64. PQclear(res);
  65. return {nullptr, nullptr};
  66. }
  67. return {res, &PQclear};
  68. }
  69. const char* env(const char* env_var, const char* default_value)
  70. {
  71. if (const char* env_p = std::getenv(env_var))
  72. return env_p;
  73. return default_value;
  74. }
  75. std::string now_string()
  76. {
  77. std::time_t time = std::time(nullptr);
  78. char timeString[std::size("Wed, 17 Apr 2013 12:00:00 GMT")];
  79. std::strftime(std::data(timeString), std::size(timeString), "%a, %d %b %Y %X %Z", std::localtime(&time));
  80. return timeString;
  81. }
  82. template <class Body, class Allocator>
  83. http::message_generator
  84. handle_error(
  85. http::request<Body, http::basic_fields<Allocator>>&& req,
  86. http::status status,
  87. beast::string_view msg)
  88. {
  89. http::response<http::string_body> res{status, req.version()};
  90. res.set(http::field::server, BOOST_BEAST_VERSION_STRING);
  91. res.set(http::field::content_type, "text/html");
  92. res.set(http::field::date, now_string());
  93. res.keep_alive(req.keep_alive());
  94. res.body() = std::string(msg);
  95. res.prepare_payload();
  96. return res;
  97. }
  98. template <class Body, class Allocator>
  99. http::message_generator
  100. handle_target(
  101. http::request<Body, http::basic_fields<Allocator>>&& req,
  102. PGconn* conn = nullptr)
  103. {
  104. static std::string msg = "Hello, World!";
  105. //std::cout << "handle_target: " << req.target() << std::endl;
  106. http::response<http::string_body> res{http::status::ok, req.version()};
  107. res.set(http::field::server, BOOST_BEAST_VERSION_STRING);
  108. res.set(http::field::content_type, "application/json");
  109. res.set(http::field::date, now_string());
  110. res.keep_alive(req.keep_alive());
  111. if (req.target() == "/json")
  112. {
  113. // {"message":"Hello, World!"}
  114. json::object obj;
  115. obj["message"] = msg;
  116. res.body() = json::serialize(obj);
  117. }
  118. else if (req.target() == "/plaintext")
  119. {
  120. res.set(http::field::content_type, "text/plain");
  121. res.body() = msg;
  122. }
  123. else if (req.target() == "/db" || req.target().starts_with("/queries/"))
  124. {
  125. static std::random_device rd;
  126. static std::minstd_rand gen(rd());
  127. static std::uniform_int_distribution<> distrib(1, 10000);
  128. static const char* word_query = "SELECT randomNumber FROM world WHERE id=$1";
  129. thread_local auto stmt = prepare(conn, "word_query_stmt", word_query, 1);
  130. if (req.target() == "/db")
  131. {
  132. const unsigned uint_id = distrib(gen);
  133. const auto str_id = std::to_string(uint_id);
  134. const char* char_ptr_id = str_id.c_str();
  135. if (auto rs = execute(conn, "word_query_stmt", 1, &char_ptr_id))
  136. {
  137. // {"id":3217,"randomNumber":2149}
  138. json::object obj;
  139. obj["id"] = uint_id;
  140. obj["randomNumber"] = std::atoi(PQgetvalue(rs.get(), 0, 0));
  141. res.body() = json::serialize(obj);
  142. //std::cout << "res.body(): " << res.body() << std::endl;
  143. }
  144. else
  145. return handle_error(std::move(req),
  146. http::status::internal_server_error,
  147. "internal_server_error");
  148. }
  149. else if (req.target().starts_with("/queries/"))
  150. {
  151. int n_queries = 1;
  152. try
  153. {
  154. const int n = std::stoi(req.target().substr(req.target().find_last_of('/')+1));
  155. if (n > 1) n_queries = n;
  156. } catch(...) {}
  157. if (n_queries > 500) n_queries = 500;
  158. json::array objs;
  159. for (auto i = 0; i < n_queries; ++i)
  160. {
  161. const unsigned uint_id = distrib(gen);
  162. const auto str_id = std::to_string(uint_id);
  163. const char* char_ptr_id = str_id.c_str();
  164. if (auto rs = execute(conn, "word_query_stmt", 1, &char_ptr_id))
  165. {
  166. // {"id":3217,"randomNumber":2149}
  167. json::object obj;
  168. obj["id"] = uint_id;
  169. obj["randomNumber"] = std::atoi(PQgetvalue(rs.get(), 0, 0));
  170. objs.push_back(obj);
  171. }
  172. else
  173. return handle_error(std::move(req),
  174. http::status::internal_server_error,
  175. "internal_server_error");
  176. }
  177. res.body() = json::serialize(objs);
  178. //std::cout << "res.body(): " << res.body() << std::endl;
  179. }
  180. }
  181. else
  182. {
  183. return handle_error(std::move(req),
  184. http::status::not_found,
  185. "Unhandled target: '" + std::string(req.target()) + "'");
  186. }
  187. res.prepare_payload();
  188. return res;
  189. }
  190. // Return a response for the given request.
  191. //
  192. // The concrete type of the response message (which depends on the
  193. // request), is type-erased in message_generator.
  194. template <class Body, class Allocator>
  195. http::message_generator
  196. handle_request(
  197. http::request<Body, http::basic_fields<Allocator>>&& req)
  198. {
  199. // Make sure we can handle the method
  200. if (req.method() != http::verb::get)
  201. return handle_error(std::move(req),
  202. http::status::not_found,
  203. "Unhandled method: '" + std::string(req.method_string()) + "'");
  204. if (req.target() == "/json" || req.target() == "/plaintext")
  205. return handle_target(std::move(req));
  206. thread_local std::unique_ptr<PGconn, decltype(&PQfinish)> conn
  207. {
  208. PQconnectdb(env("BCPP_PG_CONN_STR", "")),
  209. &PQfinish
  210. };
  211. if (PQstatus(conn.get()) != CONNECTION_OK)
  212. {
  213. auto msg = PQerrorMessage(conn.get());
  214. std::cerr << "PQerrorMessage: " << msg << std::endl;
  215. return handle_error(std::move(req),
  216. http::status::internal_server_error,
  217. msg);
  218. }
  219. return handle_target(std::move(req), conn.get());
  220. }
  221. }
  222. //------------------------------------------------------------------------------
  223. // Handles an HTTP server connection
  224. net::awaitable<void>
  225. do_session(tcp_stream stream)
  226. {
  227. // This buffer is required to persist across reads
  228. beast::flat_buffer buffer;
  229. // This lambda is used to send messages
  230. try
  231. {
  232. for(;;)
  233. {
  234. // Set the timeout.
  235. stream.expires_after(std::chrono::seconds(30));
  236. // Read a request
  237. http::request<http::string_body> req;
  238. co_await http::async_read(stream, buffer, req);
  239. // Handle the request
  240. http::message_generator msg =
  241. becpp::handle_request(std::move(req));
  242. // Determine if we should close the connection
  243. bool keep_alive = msg.keep_alive();
  244. // Send the response
  245. co_await beast::async_write(stream, std::move(msg), net::use_awaitable);
  246. if(! keep_alive)
  247. {
  248. // This means we should close the connection, usually because
  249. // the response indicated the "Connection: close" semantic.
  250. break;
  251. }
  252. }
  253. }
  254. catch (boost::system::system_error & se)
  255. {
  256. if (se.code() != http::error::end_of_stream )
  257. throw ;
  258. }
  259. // Send a TCP shutdown
  260. beast::error_code ec;
  261. stream.socket().shutdown(tcp::socket::shutdown_send, ec);
  262. // At this point the connection is closed gracefully
  263. // we ignore the error because the client might have
  264. // dropped the connection already.
  265. }
  266. //------------------------------------------------------------------------------
  267. // Accepts incoming connections and launches the sessions
  268. net::awaitable<void>
  269. do_listen(tcp::endpoint endpoint)
  270. {
  271. // Open the acceptor
  272. auto acceptor = net::use_awaitable.as_default_on(tcp::acceptor(co_await net::this_coro::executor));
  273. acceptor.open(endpoint.protocol());
  274. // Allow address reuse
  275. acceptor.set_option(net::socket_base::reuse_address(true));
  276. // Bind to the server address
  277. acceptor.bind(endpoint);
  278. // Start listening for connections
  279. acceptor.listen(net::socket_base::max_listen_connections);
  280. for(;;)
  281. boost::asio::co_spawn(
  282. acceptor.get_executor(),
  283. do_session(tcp_stream(co_await acceptor.async_accept())),
  284. [](std::exception_ptr e)
  285. {
  286. if (e)
  287. try
  288. {
  289. std::rethrow_exception(e);
  290. }
  291. catch (std::exception &e) {
  292. std::cerr << "Error in session: " << e.what() << "\n";
  293. }
  294. });
  295. }
  296. int main(int argc, char* argv[])
  297. {
  298. auto const address = net::ip::make_address(becpp::env("BCPP_ADDRESS", "0.0.0.0"));
  299. auto const port = static_cast<unsigned short>(std::atoi(becpp::env("BCPP_PORT", "8000")));
  300. auto const threads = std::max<int>(1, std::atoi(becpp::env("BCPP_N_THREADS", "3")));
  301. std::cout << "__GNUG__=" << __GNUG__ << '\n';
  302. std::cout << "__cplusplus=" << __cplusplus << '\n';
  303. std::cout << "__TIMESTAMP__=" << __TIMESTAMP__ << '\n';
  304. std::cout << "__GNUC_EXECUTION_CHARSET_NAME=" << __GNUC_EXECUTION_CHARSET_NAME << '\n';
  305. std::cout << "Listening " << address << ':' << port << " threads=" << threads << std::endl;
  306. // The io_context is required for all I/O
  307. net::io_context ioc{threads};
  308. // Spawn a listening port
  309. boost::asio::co_spawn(ioc,
  310. do_listen(tcp::endpoint{address, port}),
  311. [](std::exception_ptr e)
  312. {
  313. if (e)
  314. try
  315. {
  316. std::rethrow_exception(e);
  317. }
  318. catch(std::exception & e)
  319. {
  320. std::cerr << "Error in acceptor: " << e.what() << "\n";
  321. }
  322. });
  323. // Run the I/O service on the requested number of threads
  324. try {
  325. std::vector<std::thread> v;
  326. v.reserve(threads - 1);
  327. for(auto i = threads - 1; i > 0; --i)
  328. v.emplace_back(
  329. [&ioc]
  330. {
  331. ioc.run();
  332. });
  333. ioc.run();
  334. } catch (std::exception& e)
  335. {
  336. std::cerr << "Error in main: " << e.what() << "\n";
  337. }
  338. return EXIT_SUCCESS;
  339. }
  340. #else
  341. int main(int, char * [])
  342. {
  343. std::printf("awaitables require C++20\n");
  344. return 1;
  345. }
  346. #endif