tcptransport.cpp 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404
  1. /**
  2. * Copyright (c) 2020 Paul-Louis Ageneau
  3. *
  4. * This library is free software; you can redistribute it and/or
  5. * modify it under the terms of the GNU Lesser General Public
  6. * License as published by the Free Software Foundation; either
  7. * version 2.1 of the License, or (at your option) any later version.
  8. *
  9. * This library is distributed in the hope that it will be useful,
  10. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  12. * Lesser General Public License for more details.
  13. *
  14. * You should have received a copy of the GNU Lesser General Public
  15. * License along with this library; if not, write to the Free Software
  16. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  17. */
  18. #include "tcptransport.hpp"
  19. #include "globals.hpp"
  20. #if RTC_ENABLE_WEBSOCKET
  21. #include <exception>
  22. #ifndef _WIN32
  23. #include <fcntl.h>
  24. #include <unistd.h>
  25. #endif
  26. namespace rtc::impl {
  27. using std::to_string;
  28. SelectInterrupter::SelectInterrupter() {
  29. #ifndef _WIN32
  30. int pipefd[2];
  31. if (::pipe(pipefd) != 0)
  32. throw std::runtime_error("Failed to create pipe");
  33. ::fcntl(pipefd[0], F_SETFL, O_NONBLOCK);
  34. ::fcntl(pipefd[1], F_SETFL, O_NONBLOCK);
  35. mPipeOut = pipefd[1]; // read
  36. mPipeIn = pipefd[0]; // write
  37. #endif
  38. }
  39. SelectInterrupter::~SelectInterrupter() {
  40. std::lock_guard lock(mMutex);
  41. #ifdef _WIN32
  42. if (mDummySock != INVALID_SOCKET)
  43. ::closesocket(mDummySock);
  44. #else
  45. ::close(mPipeIn);
  46. ::close(mPipeOut);
  47. #endif
  48. }
  49. int SelectInterrupter::prepare(fd_set &readfds, [[maybe_unused]] fd_set &writefds) {
  50. std::lock_guard lock(mMutex);
  51. #ifdef _WIN32
  52. if (mDummySock == INVALID_SOCKET)
  53. mDummySock = ::socket(AF_INET, SOCK_DGRAM, 0);
  54. FD_SET(mDummySock, &readfds);
  55. return SOCKET_TO_INT(mDummySock) + 1;
  56. #else
  57. char dummy;
  58. (void)::read(mPipeIn, &dummy, 1);
  59. FD_SET(mPipeIn, &readfds);
  60. return mPipeIn + 1;
  61. #endif
  62. }
  63. void SelectInterrupter::interrupt() {
  64. std::lock_guard lock(mMutex);
  65. #ifdef _WIN32
  66. if (mDummySock != INVALID_SOCKET) {
  67. ::closesocket(mDummySock);
  68. mDummySock = INVALID_SOCKET;
  69. }
  70. #else
  71. char dummy = 0;
  72. (void)::write(mPipeOut, &dummy, 1);
  73. #endif
  74. }
  75. TcpTransport::TcpTransport(const string &hostname, const string &service, state_callback callback)
  76. : Transport(nullptr, std::move(callback)), mHostname(hostname), mService(service) {
  77. PLOG_DEBUG << "Initializing TCP transport";
  78. }
  79. TcpTransport::~TcpTransport() { stop(); }
  80. void TcpTransport::start() {
  81. Transport::start();
  82. PLOG_DEBUG << "Starting TCP recv thread";
  83. mThread = std::thread(&TcpTransport::runLoop, this);
  84. }
  85. bool TcpTransport::stop() {
  86. if (!Transport::stop())
  87. return false;
  88. PLOG_DEBUG << "Waiting for TCP recv thread";
  89. close();
  90. mThread.join();
  91. return true;
  92. }
  93. bool TcpTransport::send(message_ptr message) {
  94. std::unique_lock lock(mSockMutex);
  95. if (state() != State::Connected)
  96. return false;
  97. if (!message)
  98. return trySendQueue();
  99. PLOG_VERBOSE << "Send size=" << (message ? message->size() : 0);
  100. return outgoing(message);
  101. }
  102. void TcpTransport::incoming(message_ptr message) {
  103. if (!message)
  104. return;
  105. PLOG_VERBOSE << "Incoming size=" << message->size();
  106. recv(message);
  107. }
  108. bool TcpTransport::outgoing(message_ptr message) {
  109. // mSockMutex must be locked
  110. // Flush the queue, and if nothing is pending, try to send directly
  111. if (trySendQueue() && trySendMessage(message))
  112. return true;
  113. mSendQueue.push(message);
  114. interruptSelect(); // so the thread waits for writability
  115. return false;
  116. }
  117. void TcpTransport::connect(const string &hostname, const string &service) {
  118. PLOG_DEBUG << "Connecting to " << hostname << ":" << service;
  119. struct addrinfo hints = {};
  120. hints.ai_family = AF_UNSPEC;
  121. hints.ai_socktype = SOCK_STREAM;
  122. hints.ai_protocol = IPPROTO_TCP;
  123. hints.ai_flags = AI_ADDRCONFIG;
  124. struct addrinfo *result = nullptr;
  125. if (getaddrinfo(hostname.c_str(), service.c_str(), &hints, &result))
  126. throw std::runtime_error("Resolution failed for \"" + hostname + ":" + service + "\"");
  127. for (auto p = result; p; p = p->ai_next) {
  128. try {
  129. connect(p->ai_addr, socklen_t(p->ai_addrlen));
  130. PLOG_INFO << "Connected to " << hostname << ":" << service;
  131. freeaddrinfo(result);
  132. return;
  133. } catch (const std::runtime_error &e) {
  134. if (p->ai_next) {
  135. PLOG_DEBUG << e.what();
  136. } else {
  137. PLOG_WARNING << e.what();
  138. }
  139. }
  140. }
  141. freeaddrinfo(result);
  142. std::ostringstream msg;
  143. msg << "Connection to " << hostname << ":" << service << " failed";
  144. throw std::runtime_error(msg.str());
  145. }
  146. void TcpTransport::connect(const sockaddr *addr, socklen_t addrlen) {
  147. std::unique_lock lock(mSockMutex);
  148. try {
  149. char node[MAX_NUMERICNODE_LEN];
  150. char serv[MAX_NUMERICSERV_LEN];
  151. if (getnameinfo(addr, addrlen, node, MAX_NUMERICNODE_LEN, serv, MAX_NUMERICSERV_LEN,
  152. NI_NUMERICHOST | NI_NUMERICSERV) == 0) {
  153. PLOG_DEBUG << "Trying address " << node << ":" << serv;
  154. }
  155. PLOG_VERBOSE << "Creating TCP socket";
  156. // Create socket
  157. mSock = ::socket(addr->sa_family, SOCK_STREAM, IPPROTO_TCP);
  158. if (mSock == INVALID_SOCKET)
  159. throw std::runtime_error("TCP socket creation failed");
  160. ctl_t b = 1;
  161. if (::ioctlsocket(mSock, FIONBIO, &b) < 0)
  162. throw std::runtime_error("Failed to set socket non-blocking mode");
  163. #ifdef __APPLE__
  164. // MacOS lacks MSG_NOSIGNAL and requires SO_NOSIGPIPE instead
  165. int opt = 1;
  166. if (::setsockopt(mSock, SOL_SOCKET, SO_NOSIGPIPE, &opt, sizeof(opt)) < 0)
  167. throw std::runtime_error("Failed to disable SIGPIPE for socket");
  168. #endif
  169. // Initiate connection
  170. int ret = ::connect(mSock, addr, addrlen);
  171. if (ret < 0 && sockerrno != SEINPROGRESS && sockerrno != SEWOULDBLOCK) {
  172. std::ostringstream msg;
  173. msg << "TCP connection to " << node << ":" << serv << " failed, errno=" << sockerrno;
  174. throw std::runtime_error(msg.str());
  175. }
  176. while (true) {
  177. fd_set writefds;
  178. FD_ZERO(&writefds);
  179. FD_SET(mSock, &writefds);
  180. struct timeval tv;
  181. tv.tv_sec = 10; // TODO: Make the timeout configurable
  182. tv.tv_usec = 0;
  183. ret = ::select(SOCKET_TO_INT(mSock) + 1, NULL, &writefds, NULL, &tv);
  184. if (ret < 0) {
  185. if (sockerrno == SEINTR || sockerrno == SEAGAIN) // interrupted
  186. continue;
  187. else
  188. throw std::runtime_error("Failed to wait for socket connection");
  189. }
  190. if (ret == 0) {
  191. std::ostringstream msg;
  192. msg << "TCP connection to " << node << ":" << serv << " timed out";
  193. throw std::runtime_error(msg.str());
  194. }
  195. int error = 0;
  196. socklen_t errorlen = sizeof(error);
  197. if (::getsockopt(mSock, SOL_SOCKET, SO_ERROR, (char *)&error, &errorlen) != 0)
  198. throw std::runtime_error("Failed to get socket error code");
  199. if (error != 0) {
  200. std::ostringstream msg;
  201. msg << "TCP connection to " << node << ":" << serv << " failed, errno=" << error;
  202. throw std::runtime_error(msg.str());
  203. }
  204. PLOG_DEBUG << "TCP connection to " << node << ":" << serv << " succeeded";
  205. break;
  206. }
  207. } catch (...) {
  208. if (mSock != INVALID_SOCKET) {
  209. ::closesocket(mSock);
  210. mSock = INVALID_SOCKET;
  211. }
  212. throw;
  213. }
  214. }
  215. void TcpTransport::close() {
  216. std::unique_lock lock(mSockMutex);
  217. if (mSock != INVALID_SOCKET) {
  218. PLOG_DEBUG << "Closing TCP socket";
  219. ::closesocket(mSock);
  220. mSock = INVALID_SOCKET;
  221. }
  222. changeState(State::Disconnected);
  223. interruptSelect();
  224. }
  225. bool TcpTransport::trySendQueue() {
  226. // mSockMutex must be locked
  227. while (auto next = mSendQueue.peek()) {
  228. message_ptr message = std::move(*next);
  229. if (!trySendMessage(message)) {
  230. mSendQueue.exchange(message);
  231. return false;
  232. }
  233. mSendQueue.pop();
  234. }
  235. return true;
  236. }
  237. bool TcpTransport::trySendMessage(message_ptr &message) {
  238. // mSockMutex must be locked
  239. auto data = reinterpret_cast<const char *>(message->data());
  240. auto size = message->size();
  241. while (size) {
  242. #if defined(__APPLE__) || defined(_WIN32)
  243. int flags = 0;
  244. #else
  245. int flags = MSG_NOSIGNAL;
  246. #endif
  247. int len = ::send(mSock, data, int(size), flags);
  248. if (len < 0) {
  249. if (sockerrno == SEAGAIN || sockerrno == SEWOULDBLOCK) {
  250. message = make_message(message->end() - size, message->end());
  251. return false;
  252. } else {
  253. throw std::runtime_error("Connection lost, errno=" + to_string(sockerrno));
  254. }
  255. }
  256. data += len;
  257. size -= len;
  258. }
  259. message = nullptr;
  260. return true;
  261. }
  262. void TcpTransport::runLoop() {
  263. const size_t bufferSize = 4096;
  264. // Connect
  265. try {
  266. changeState(State::Connecting);
  267. connect(mHostname, mService);
  268. } catch (const std::exception &e) {
  269. PLOG_ERROR << "TCP connect: " << e.what();
  270. changeState(State::Failed);
  271. return;
  272. }
  273. // Receive loop
  274. try {
  275. PLOG_INFO << "TCP connected";
  276. changeState(State::Connected);
  277. while (true) {
  278. std::unique_lock lock(mSockMutex);
  279. if (mSock == INVALID_SOCKET)
  280. break;
  281. fd_set readfds, writefds;
  282. int n = prepareSelect(readfds, writefds);
  283. struct timeval tv;
  284. tv.tv_sec = 10;
  285. tv.tv_usec = 0;
  286. lock.unlock();
  287. int ret = ::select(n, &readfds, &writefds, NULL, &tv);
  288. lock.lock();
  289. if (mSock == INVALID_SOCKET)
  290. break;
  291. if (ret < 0) {
  292. throw std::runtime_error("Failed to wait on socket");
  293. } else if (ret == 0) {
  294. PLOG_VERBOSE << "TCP is idle";
  295. lock.unlock(); // unlock now since the upper layer might send on incoming
  296. incoming(make_message(0));
  297. continue;
  298. }
  299. if (FD_ISSET(mSock, &writefds))
  300. trySendQueue();
  301. if (FD_ISSET(mSock, &readfds)) {
  302. char buffer[bufferSize];
  303. int len = ::recv(mSock, buffer, bufferSize, 0);
  304. if (len < 0) {
  305. if (sockerrno == SEAGAIN || sockerrno == SEWOULDBLOCK) {
  306. continue;
  307. } else {
  308. throw std::runtime_error("Connection lost");
  309. }
  310. }
  311. if (len == 0)
  312. break; // clean close
  313. lock.unlock(); // unlock now since the upper layer might send on incoming
  314. auto *b = reinterpret_cast<byte *>(buffer);
  315. incoming(make_message(b, b + len));
  316. }
  317. }
  318. } catch (const std::exception &e) {
  319. PLOG_ERROR << "TCP recv: " << e.what();
  320. }
  321. PLOG_INFO << "TCP disconnected";
  322. changeState(State::Disconnected);
  323. recv(nullptr);
  324. }
  325. int TcpTransport::prepareSelect(fd_set &readfds, fd_set &writefds) {
  326. FD_ZERO(&readfds);
  327. FD_ZERO(&writefds);
  328. FD_SET(mSock, &readfds);
  329. if (!mSendQueue.empty())
  330. FD_SET(mSock, &writefds);
  331. int n = SOCKET_TO_INT(mSock) + 1;
  332. int m = mInterrupter.prepare(readfds, writefds);
  333. return std::max(n, m);
  334. }
  335. void TcpTransport::interruptSelect() { mInterrupter.interrupt(); }
  336. } // namespace rtc::impl
  337. #endif