websocket.cpp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492
  1. /**
  2. * Copyright (c) 2020-2021 Paul-Louis Ageneau
  3. *
  4. * This Source Code Form is subject to the terms of the Mozilla Public
  5. * License, v. 2.0. If a copy of the MPL was not distributed with this
  6. * file, You can obtain one at https://mozilla.org/MPL/2.0/.
  7. */
  8. #if RTC_ENABLE_WEBSOCKET
  9. #include "websocket.hpp"
  10. #include "common.hpp"
  11. #include "internals.hpp"
  12. #include "processor.hpp"
  13. #include "utils.hpp"
  14. #include "tcptransport.hpp"
  15. #include "tcpproxytransport.hpp"
  16. #include "tlstransport.hpp"
  17. #include "verifiedtlstransport.hpp"
  18. #include "wstransport.hpp"
  19. #include <array>
  20. #include <chrono>
  21. #include <regex>
  22. #ifdef _WIN32
  23. #include <winsock2.h>
  24. #endif
  25. namespace rtc::impl {
  26. using namespace std::placeholders;
  27. using namespace std::chrono_literals;
  28. WebSocket::WebSocket(optional<Configuration> optConfig, certificate_ptr certificate)
  29. : config(optConfig ? std::move(*optConfig) : Configuration()),
  30. mCertificate(std::move(certificate)), mIsSecure(mCertificate != nullptr),
  31. mRecvQueue(RECV_QUEUE_LIMIT, message_size_func) {
  32. PLOG_VERBOSE << "Creating WebSocket";
  33. }
  34. WebSocket::~WebSocket() { PLOG_VERBOSE << "Destroying WebSocket"; }
  35. void WebSocket::open(const string &url) {
  36. PLOG_VERBOSE << "Opening WebSocket to URL: " << url;
  37. if (state != State::Closed)
  38. throw std::logic_error("WebSocket must be closed before opening");
  39. if (config.proxyServer) {
  40. mIsProxied = true;
  41. }
  42. // Modified regex from RFC 3986, see https://www.rfc-editor.org/rfc/rfc3986.html#appendix-B
  43. static const char *rs =
  44. R"(^(([^:.@/?#]+):)?(/{0,2}((([^:@]*)(:([^@]*))?)@)?(([^:/?#]*)(:([^/?#]*))?))?([^?#]*)(\?([^#]*))?(#(.*))?)";
  45. static const std::regex r(rs, std::regex::extended);
  46. std::smatch m;
  47. if (!std::regex_match(url, m, r) || m[10].length() == 0)
  48. throw std::invalid_argument("Invalid WebSocket URL: " + url);
  49. string scheme = m[2];
  50. if (scheme.empty())
  51. scheme = "ws";
  52. if (scheme != "ws" && scheme != "wss")
  53. throw std::invalid_argument("Invalid WebSocket scheme: " + scheme);
  54. mIsSecure = (scheme != "ws");
  55. string username = utils::url_decode(m[6]);
  56. string password = utils::url_decode(m[8]);
  57. if (!username.empty() || !password.empty()) {
  58. PLOG_WARNING << "HTTP authentication support for WebSocket is not implemented";
  59. }
  60. string host;
  61. string hostname = m[10];
  62. string service = m[12];
  63. if (service.empty()) {
  64. service = mIsSecure ? "443" : "80";
  65. host = hostname;
  66. } else {
  67. host = hostname + ':' + service;
  68. }
  69. if (hostname.front() == '[' && hostname.back() == ']') {
  70. // IPv6 literal
  71. hostname.erase(hostname.begin());
  72. hostname.pop_back();
  73. } else {
  74. hostname = utils::url_decode(hostname);
  75. }
  76. string path = m[13];
  77. if (path.empty())
  78. path += '/';
  79. if (string query = m[15]; !query.empty())
  80. path += "?" + query;
  81. mHostname = hostname; // for TLS SNI
  82. mService = service; //For proxy
  83. std::atomic_store(&mWsHandshake, std::make_shared<WsHandshake>(host, path, config.protocols));
  84. changeState(State::Connecting);
  85. if (mIsProxied)
  86. {
  87. //TODO catch bad convert
  88. setTcpTransport(std::make_shared<TcpTransport>(mProxy.value().hostname, std::to_string(mProxy.value().port), nullptr));
  89. }
  90. else
  91. {
  92. setTcpTransport(std::make_shared<TcpTransport>(hostname, service, nullptr));
  93. }
  94. }
  95. void WebSocket::close() {
  96. auto s = state.load();
  97. if (s == State::Connecting || s == State::Open) {
  98. PLOG_VERBOSE << "Closing WebSocket";
  99. changeState(State::Closing);
  100. if (auto transport = std::atomic_load(&mWsTransport))
  101. transport->stop();
  102. else
  103. remoteClose();
  104. }
  105. }
  106. void WebSocket::remoteClose() {
  107. close();
  108. if (state.load() != State::Closed)
  109. closeTransports();
  110. }
  111. bool WebSocket::isOpen() const { return state == State::Open; }
  112. bool WebSocket::isClosed() const { return state == State::Closed; }
  113. size_t WebSocket::maxMessageSize() const { return DEFAULT_MAX_MESSAGE_SIZE; }
  114. optional<message_variant> WebSocket::receive() {
  115. while (auto next = mRecvQueue.pop()) {
  116. message_ptr message = *next;
  117. if (message->type != Message::Control)
  118. return to_variant(std::move(*message));
  119. }
  120. return nullopt;
  121. }
  122. optional<message_variant> WebSocket::peek() {
  123. while (auto next = mRecvQueue.peek()) {
  124. message_ptr message = *next;
  125. if (message->type != Message::Control)
  126. return to_variant(std::move(*message));
  127. mRecvQueue.pop();
  128. }
  129. return nullopt;
  130. }
  131. size_t WebSocket::availableAmount() const { return mRecvQueue.amount(); }
  132. bool WebSocket::changeState(State newState) { return state.exchange(newState) != newState; }
  133. bool WebSocket::outgoing(message_ptr message) {
  134. if (state != State::Open || !mWsTransport)
  135. throw std::runtime_error("WebSocket is not open");
  136. if (message->size() > maxMessageSize())
  137. throw std::runtime_error("Message size exceeds limit");
  138. return mWsTransport->send(message);
  139. }
  140. void WebSocket::incoming(message_ptr message) {
  141. if (!message) {
  142. remoteClose();
  143. return;
  144. }
  145. if (message->type == Message::String || message->type == Message::Binary) {
  146. mRecvQueue.push(message);
  147. triggerAvailable(mRecvQueue.size());
  148. }
  149. }
  150. // Helper for WebSocket::initXTransport methods: start and emplace the transport
  151. template <typename T>
  152. shared_ptr<T> emplaceTransport(WebSocket *ws, shared_ptr<T> *member, shared_ptr<T> transport) {
  153. std::atomic_store(member, transport);
  154. try {
  155. transport->start();
  156. } catch (...) {
  157. std::atomic_store(member, decltype(transport)(nullptr));
  158. transport->stop();
  159. throw;
  160. }
  161. if (ws->state == WebSocket::State::Closed) {
  162. std::atomic_store(member, decltype(transport)(nullptr));
  163. transport->stop();
  164. return nullptr;
  165. }
  166. return transport;
  167. }
  168. shared_ptr<TcpTransport> WebSocket::setTcpTransport(shared_ptr<TcpTransport> transport) {
  169. PLOG_VERBOSE << "Starting TCP transport";
  170. if (!transport)
  171. throw std::logic_error("TCP transport is null");
  172. using State = TcpTransport::State;
  173. try {
  174. if (std::atomic_load(&mTcpTransport))
  175. throw std::logic_error("TCP transport is already set");
  176. transport->onBufferedAmount(weak_bind(&WebSocket::triggerBufferedAmount, this, _1));
  177. transport->onStateChange([this, weak_this = weak_from_this()](State transportState) {
  178. auto shared_this = weak_this.lock();
  179. if (!shared_this)
  180. return;
  181. switch (transportState) {
  182. case State::Connected:
  183. if (mIsProxied)
  184. initProxyTransport();
  185. else if (mIsSecure)
  186. initTlsTransport();
  187. else
  188. initWsTransport();
  189. break;
  190. case State::Failed:
  191. triggerError("TCP connection failed");
  192. remoteClose();
  193. break;
  194. case State::Disconnected:
  195. remoteClose();
  196. break;
  197. default:
  198. // Ignore
  199. break;
  200. }
  201. });
  202. // WS transport sends a ping on read timeout
  203. auto pingInterval = config.pingInterval.value_or(10000ms);
  204. if (pingInterval > std::chrono::milliseconds::zero())
  205. transport->setReadTimeout(pingInterval);
  206. return emplaceTransport(this, &mTcpTransport, std::move(transport));
  207. } catch (const std::exception &e) {
  208. PLOG_ERROR << e.what();
  209. remoteClose();
  210. throw std::runtime_error("TCP transport initialization failed");
  211. }
  212. }
  213. shared_ptr<TcpProxyTransport> WebSocket::initProxyTransport() {
  214. PLOG_VERBOSE << "Starting Tcp Proxy transport";
  215. using State = TcpProxyTransport::State;
  216. try {
  217. if (auto transport = std::atomic_load(&mProxyTransport))
  218. return transport;
  219. auto lower = std::atomic_load(&mTcpTransport);
  220. if (!lower)
  221. throw std::logic_error("No underlying TCP transport for Proxy transport");
  222. auto stateChangeCallback = [this, weak_this = weak_from_this()](State transportState) {
  223. auto shared_this = weak_this.lock();
  224. if (!shared_this)
  225. return;
  226. switch (transportState) {
  227. case State::Connected:
  228. if (mIsSecure)
  229. initTlsTransport();
  230. else
  231. initWsTransport();
  232. break;
  233. case State::Failed:
  234. triggerError("Proxy connection failed");
  235. remoteClose();
  236. break;
  237. case State::Disconnected:
  238. remoteClose();
  239. break;
  240. default:
  241. // Ignore
  242. break;
  243. }
  244. };
  245. //TODO check optionals?
  246. auto transport = std::make_shared<TcpProxyTransport>( lower, mHostname.value(), mService.value(), stateChangeCallback );
  247. return emplaceTransport(this, &mProxyTransport, std::move(transport));
  248. } catch (const std::exception &e) {
  249. PLOG_ERROR << e.what();
  250. remoteClose();
  251. throw std::runtime_error("Tcp Proxy transport initialization failed");
  252. }
  253. }
  254. shared_ptr<TlsTransport> WebSocket::initTlsTransport() {
  255. PLOG_VERBOSE << "Starting TLS transport";
  256. using State = TlsTransport::State;
  257. try {
  258. if (auto transport = std::atomic_load(&mTlsTransport))
  259. return transport;
  260. auto lower = std::atomic_load(&mTcpTransport);
  261. if (!lower)
  262. throw std::logic_error("No underlying TCP transport for TLS transport");
  263. auto stateChangeCallback = [this, weak_this = weak_from_this()](State transportState) {
  264. auto shared_this = weak_this.lock();
  265. if (!shared_this)
  266. return;
  267. switch (transportState) {
  268. case State::Connected:
  269. initWsTransport();
  270. break;
  271. case State::Failed:
  272. triggerError("TLS connection failed");
  273. remoteClose();
  274. break;
  275. case State::Disconnected:
  276. remoteClose();
  277. break;
  278. default:
  279. // Ignore
  280. break;
  281. }
  282. };
  283. bool verify = mHostname.has_value() && !config.disableTlsVerification;
  284. #ifdef _WIN32
  285. if (std::exchange(verify, false)) {
  286. PLOG_WARNING << "TLS certificate verification with root CA is not supported on Windows";
  287. }
  288. #endif
  289. shared_ptr<TlsTransport> transport;
  290. if (verify)
  291. transport = std::make_shared<VerifiedTlsTransport>(lower, mHostname.value(),
  292. mCertificate, stateChangeCallback);
  293. else
  294. transport =
  295. std::make_shared<TlsTransport>(lower, mHostname, mCertificate, stateChangeCallback);
  296. return emplaceTransport(this, &mTlsTransport, std::move(transport));
  297. } catch (const std::exception &e) {
  298. PLOG_ERROR << e.what();
  299. remoteClose();
  300. throw std::runtime_error("TLS transport initialization failed");
  301. }
  302. }
  303. shared_ptr<WsTransport> WebSocket::initWsTransport() {
  304. PLOG_VERBOSE << "Starting WebSocket transport";
  305. using State = WsTransport::State;
  306. try {
  307. if (auto transport = std::atomic_load(&mWsTransport))
  308. return transport;
  309. variant<shared_ptr<TcpTransport>, shared_ptr<TlsTransport>> lower;
  310. if (mIsSecure) {
  311. auto transport = std::atomic_load(&mTlsTransport);
  312. if (!transport)
  313. throw std::logic_error("No underlying TLS transport for WebSocket transport");
  314. lower = transport;
  315. } else {
  316. auto transport = std::atomic_load(&mTcpTransport);
  317. if (!transport)
  318. throw std::logic_error("No underlying TCP transport for WebSocket transport");
  319. lower = transport;
  320. }
  321. if (!atomic_load(&mWsHandshake))
  322. atomic_store(&mWsHandshake, std::make_shared<WsHandshake>());
  323. auto stateChangeCallback = [this, weak_this = weak_from_this()](State transportState) {
  324. auto shared_this = weak_this.lock();
  325. if (!shared_this)
  326. return;
  327. switch (transportState) {
  328. case State::Connected:
  329. if (state == WebSocket::State::Connecting) {
  330. PLOG_DEBUG << "WebSocket open";
  331. if (changeState(WebSocket::State::Open))
  332. triggerOpen();
  333. }
  334. break;
  335. case State::Failed:
  336. triggerError("WebSocket connection failed");
  337. remoteClose();
  338. break;
  339. case State::Disconnected:
  340. remoteClose();
  341. break;
  342. default:
  343. // Ignore
  344. break;
  345. }
  346. };
  347. auto maxOutstandingPings = config.maxOutstandingPings.value_or(0);
  348. auto transport = std::make_shared<WsTransport>(lower, mWsHandshake, maxOutstandingPings,
  349. weak_bind(&WebSocket::incoming, this, _1),
  350. stateChangeCallback);
  351. return emplaceTransport(this, &mWsTransport, std::move(transport));
  352. } catch (const std::exception &e) {
  353. PLOG_ERROR << e.what();
  354. remoteClose();
  355. throw std::runtime_error("WebSocket transport initialization failed");
  356. }
  357. }
  358. shared_ptr<TcpTransport> WebSocket::getTcpTransport() const {
  359. return std::atomic_load(&mTcpTransport);
  360. }
  361. shared_ptr<TlsTransport> WebSocket::getTlsTransport() const {
  362. return std::atomic_load(&mTlsTransport);
  363. }
  364. shared_ptr<WsTransport> WebSocket::getWsTransport() const {
  365. return std::atomic_load(&mWsTransport);
  366. }
  367. shared_ptr<WsHandshake> WebSocket::getWsHandshake() const {
  368. return std::atomic_load(&mWsHandshake);
  369. }
  370. void WebSocket::closeTransports() {
  371. PLOG_VERBOSE << "Closing transports";
  372. if (!changeState(State::Closed))
  373. return; // already closed
  374. // Pass the pointers to a thread, allowing to terminate a transport from its own thread
  375. auto ws = std::atomic_exchange(&mWsTransport, decltype(mWsTransport)(nullptr));
  376. auto tls = std::atomic_exchange(&mTlsTransport, decltype(mTlsTransport)(nullptr));
  377. auto tcp = std::atomic_exchange(&mTcpTransport, decltype(mTcpTransport)(nullptr));
  378. if (ws)
  379. ws->onRecv(nullptr);
  380. if (tcp)
  381. tcp->onBufferedAmount(nullptr);
  382. using array = std::array<shared_ptr<Transport>, 3>;
  383. array transports{std::move(ws), std::move(tls), std::move(tcp)};
  384. for (const auto &t : transports)
  385. if (t)
  386. t->onStateChange(nullptr);
  387. TearDownProcessor::Instance().enqueue(
  388. [transports = std::move(transports), token = Init::Instance().token()]() mutable {
  389. for (const auto &t : transports) {
  390. if (t) {
  391. t->stop();
  392. break;
  393. }
  394. }
  395. for (auto &t : transports)
  396. t.reset();
  397. });
  398. triggerClosed();
  399. }
  400. } // namespace rtc::impl
  401. #endif