tcptransport.cpp 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. /**
  2. * Copyright (c) 2020 Paul-Louis Ageneau
  3. *
  4. * This library is free software; you can redistribute it and/or
  5. * modify it under the terms of the GNU Lesser General Public
  6. * License as published by the Free Software Foundation; either
  7. * version 2.1 of the License, or (at your option) any later version.
  8. *
  9. * This library is distributed in the hope that it will be useful,
  10. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  12. * Lesser General Public License for more details.
  13. *
  14. * You should have received a copy of the GNU Lesser General Public
  15. * License along with this library; if not, write to the Free Software
  16. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  17. */
  18. #include "tcptransport.hpp"
  19. #include "internals.hpp"
  20. #if RTC_ENABLE_WEBSOCKET
  21. #ifndef _WIN32
  22. #include <fcntl.h>
  23. #include <unistd.h>
  24. #endif
  25. namespace rtc::impl {
  26. TcpTransport::TcpTransport(string hostname, string service, state_callback callback)
  27. : Transport(nullptr, std::move(callback)), mIsActive(true), mHostname(std::move(hostname)),
  28. mService(std::move(service)) {
  29. PLOG_DEBUG << "Initializing TCP transport";
  30. }
  31. TcpTransport::TcpTransport(socket_t sock, state_callback callback)
  32. : Transport(nullptr, std::move(callback)), mIsActive(false), mSock(sock) {
  33. PLOG_DEBUG << "Initializing TCP transport with socket";
  34. // Set non-blocking
  35. ctl_t b = 1;
  36. if (::ioctlsocket(mSock, FIONBIO, &b) < 0)
  37. throw std::runtime_error("Failed to set socket non-blocking mode");
  38. // Retrieve hostname and service
  39. struct sockaddr_storage addr;
  40. socklen_t addrlen = sizeof(addr);
  41. if (::getpeername(mSock, reinterpret_cast<struct sockaddr *>(&addr), &addrlen) < 0)
  42. throw std::runtime_error("getsockname failed");
  43. char node[MAX_NUMERICNODE_LEN];
  44. char serv[MAX_NUMERICSERV_LEN];
  45. if (::getnameinfo(reinterpret_cast<struct sockaddr *>(&addr), addrlen, node,
  46. MAX_NUMERICNODE_LEN, serv, MAX_NUMERICSERV_LEN,
  47. NI_NUMERICHOST | NI_NUMERICSERV) != 0)
  48. throw std::runtime_error("getnameinfo failed");
  49. mHostname = node;
  50. mService = serv;
  51. }
  52. TcpTransport::~TcpTransport() { stop(); }
  53. void TcpTransport::start() {
  54. Transport::start();
  55. PLOG_DEBUG << "Starting TCP recv thread";
  56. mThread = std::thread(&TcpTransport::runLoop, this);
  57. }
  58. bool TcpTransport::stop() {
  59. if (!Transport::stop())
  60. return false;
  61. PLOG_DEBUG << "Waiting for TCP recv thread";
  62. close();
  63. mThread.join();
  64. return true;
  65. }
  66. bool TcpTransport::send(message_ptr message) {
  67. std::unique_lock lock(mSockMutex);
  68. if(state() == State::Connecting)
  69. throw std::runtime_error("Connection is not open");
  70. if (state() != State::Connected)
  71. return false;
  72. if (!message)
  73. return trySendQueue();
  74. PLOG_VERBOSE << "Send size=" << (message ? message->size() : 0);
  75. return outgoing(message);
  76. }
  77. void TcpTransport::incoming(message_ptr message) {
  78. if (!message)
  79. return;
  80. PLOG_VERBOSE << "Incoming size=" << message->size();
  81. recv(message);
  82. }
  83. bool TcpTransport::outgoing(message_ptr message) {
  84. // mSockMutex must be locked
  85. // Flush the queue, and if nothing is pending, try to send directly
  86. if (trySendQueue() && trySendMessage(message))
  87. return true;
  88. mSendQueue.push(message);
  89. mInterrupter.interrupt(); // so the thread waits for writability
  90. return false;
  91. }
  92. string TcpTransport::remoteAddress() const { return mHostname + ':' + mService; }
  93. void TcpTransport::connect(const string &hostname, const string &service) {
  94. PLOG_DEBUG << "Connecting to " << hostname << ":" << service;
  95. struct addrinfo hints = {};
  96. hints.ai_family = AF_UNSPEC;
  97. hints.ai_socktype = SOCK_STREAM;
  98. hints.ai_protocol = IPPROTO_TCP;
  99. hints.ai_flags = AI_ADDRCONFIG;
  100. struct addrinfo *result = nullptr;
  101. if (getaddrinfo(hostname.c_str(), service.c_str(), &hints, &result))
  102. throw std::runtime_error("Resolution failed for \"" + hostname + ":" + service + "\"");
  103. for (auto p = result; p; p = p->ai_next) {
  104. try {
  105. connect(p->ai_addr, socklen_t(p->ai_addrlen));
  106. PLOG_INFO << "Connected to " << hostname << ":" << service;
  107. freeaddrinfo(result);
  108. return;
  109. } catch (const std::runtime_error &e) {
  110. if (p->ai_next) {
  111. PLOG_DEBUG << e.what();
  112. } else {
  113. PLOG_WARNING << e.what();
  114. }
  115. }
  116. }
  117. freeaddrinfo(result);
  118. std::ostringstream msg;
  119. msg << "Connection to " << hostname << ":" << service << " failed";
  120. throw std::runtime_error(msg.str());
  121. }
  122. void TcpTransport::connect(const sockaddr *addr, socklen_t addrlen) {
  123. std::unique_lock lock(mSockMutex);
  124. try {
  125. char node[MAX_NUMERICNODE_LEN];
  126. char serv[MAX_NUMERICSERV_LEN];
  127. if (getnameinfo(addr, addrlen, node, MAX_NUMERICNODE_LEN, serv, MAX_NUMERICSERV_LEN,
  128. NI_NUMERICHOST | NI_NUMERICSERV) == 0) {
  129. PLOG_DEBUG << "Trying address " << node << ":" << serv;
  130. }
  131. PLOG_VERBOSE << "Creating TCP socket";
  132. // Create socket
  133. mSock = ::socket(addr->sa_family, SOCK_STREAM, IPPROTO_TCP);
  134. if (mSock == INVALID_SOCKET)
  135. throw std::runtime_error("TCP socket creation failed");
  136. // Set non-blocking
  137. ctl_t b = 1;
  138. if (::ioctlsocket(mSock, FIONBIO, &b) < 0)
  139. throw std::runtime_error("Failed to set socket non-blocking mode");
  140. #ifdef __APPLE__
  141. // MacOS lacks MSG_NOSIGNAL and requires SO_NOSIGPIPE instead
  142. int opt = 1;
  143. if (::setsockopt(mSock, SOL_SOCKET, SO_NOSIGPIPE, &opt, sizeof(opt)) < 0)
  144. throw std::runtime_error("Failed to disable SIGPIPE for socket");
  145. #endif
  146. // Initiate connection
  147. int ret = ::connect(mSock, addr, addrlen);
  148. if (ret < 0 && sockerrno != SEINPROGRESS && sockerrno != SEWOULDBLOCK) {
  149. std::ostringstream msg;
  150. msg << "TCP connection to " << node << ":" << serv << " failed, errno=" << sockerrno;
  151. throw std::runtime_error(msg.str());
  152. }
  153. while (true) {
  154. fd_set writefds;
  155. FD_ZERO(&writefds);
  156. FD_SET(mSock, &writefds);
  157. struct timeval tv;
  158. tv.tv_sec = 10; // TODO: Make the timeout configurable
  159. tv.tv_usec = 0;
  160. ret = ::select(SOCKET_TO_INT(mSock) + 1, NULL, &writefds, NULL, &tv);
  161. if (ret < 0) {
  162. if (sockerrno == SEINTR || sockerrno == SEAGAIN) // interrupted
  163. continue;
  164. else
  165. throw std::runtime_error("Failed to wait for socket connection");
  166. }
  167. if (ret == 0) {
  168. std::ostringstream msg;
  169. msg << "TCP connection to " << node << ":" << serv << " timed out";
  170. throw std::runtime_error(msg.str());
  171. }
  172. int error = 0;
  173. socklen_t errorlen = sizeof(error);
  174. if (::getsockopt(mSock, SOL_SOCKET, SO_ERROR, (char *)&error, &errorlen) != 0)
  175. throw std::runtime_error("Failed to get socket error code");
  176. if (error != 0) {
  177. std::ostringstream msg;
  178. msg << "TCP connection to " << node << ":" << serv << " failed, errno=" << error;
  179. throw std::runtime_error(msg.str());
  180. }
  181. PLOG_DEBUG << "TCP connection to " << node << ":" << serv << " succeeded";
  182. break;
  183. }
  184. } catch (...) {
  185. if (mSock != INVALID_SOCKET) {
  186. ::closesocket(mSock);
  187. mSock = INVALID_SOCKET;
  188. }
  189. throw;
  190. }
  191. }
  192. void TcpTransport::close() {
  193. std::unique_lock lock(mSockMutex);
  194. if (mSock != INVALID_SOCKET) {
  195. PLOG_DEBUG << "Closing TCP socket";
  196. ::closesocket(mSock);
  197. mSock = INVALID_SOCKET;
  198. }
  199. changeState(State::Disconnected);
  200. mInterrupter.interrupt();
  201. }
  202. bool TcpTransport::trySendQueue() {
  203. // mSockMutex must be locked
  204. while (auto next = mSendQueue.peek()) {
  205. message_ptr message = std::move(*next);
  206. if (!trySendMessage(message)) {
  207. mSendQueue.exchange(message);
  208. return false;
  209. }
  210. mSendQueue.pop();
  211. }
  212. return true;
  213. }
  214. bool TcpTransport::trySendMessage(message_ptr &message) {
  215. // mSockMutex must be locked
  216. auto data = reinterpret_cast<const char *>(message->data());
  217. auto size = message->size();
  218. while (size) {
  219. #if defined(__APPLE__) || defined(_WIN32)
  220. int flags = 0;
  221. #else
  222. int flags = MSG_NOSIGNAL;
  223. #endif
  224. int len = ::send(mSock, data, int(size), flags);
  225. if (len < 0) {
  226. if (sockerrno == SEAGAIN || sockerrno == SEWOULDBLOCK) {
  227. message = make_message(message->end() - size, message->end());
  228. return false;
  229. } else {
  230. PLOG_ERROR << "Connection closed, errno=" << sockerrno;
  231. throw std::runtime_error("Connection closed");
  232. }
  233. }
  234. data += len;
  235. size -= len;
  236. }
  237. message = nullptr;
  238. return true;
  239. }
  240. void TcpTransport::runLoop() {
  241. const size_t bufferSize = 4096;
  242. // Connect
  243. try {
  244. changeState(State::Connecting);
  245. if (mSock == INVALID_SOCKET)
  246. connect(mHostname, mService);
  247. } catch (const std::exception &e) {
  248. PLOG_ERROR << "TCP connect: " << e.what();
  249. changeState(State::Failed);
  250. return;
  251. }
  252. // Receive loop
  253. try {
  254. PLOG_INFO << "TCP connected";
  255. changeState(State::Connected);
  256. while (true) {
  257. std::unique_lock lock(mSockMutex);
  258. if (mSock == INVALID_SOCKET)
  259. break;
  260. fd_set readfds, writefds;
  261. FD_ZERO(&readfds);
  262. FD_ZERO(&writefds);
  263. FD_SET(mSock, &readfds);
  264. if (!mSendQueue.empty())
  265. FD_SET(mSock, &writefds);
  266. int n = std::max(mInterrupter.prepare(readfds), SOCKET_TO_INT(mSock) + 1);
  267. struct timeval tv;
  268. tv.tv_sec = 10;
  269. tv.tv_usec = 0;
  270. lock.unlock();
  271. int ret = ::select(n, &readfds, &writefds, NULL, &tv);
  272. lock.lock();
  273. if (mSock == INVALID_SOCKET)
  274. break;
  275. if (ret < 0) {
  276. throw std::runtime_error("Failed to wait on socket");
  277. } else if (ret == 0) {
  278. PLOG_VERBOSE << "TCP is idle";
  279. lock.unlock(); // unlock now since the upper layer might send on incoming
  280. incoming(make_message(0));
  281. continue;
  282. }
  283. if (FD_ISSET(mSock, &writefds))
  284. trySendQueue();
  285. if (FD_ISSET(mSock, &readfds)) {
  286. char buffer[bufferSize];
  287. int len = ::recv(mSock, buffer, bufferSize, 0);
  288. if (len < 0) {
  289. if (sockerrno == SEAGAIN || sockerrno == SEWOULDBLOCK) {
  290. continue;
  291. } else {
  292. PLOG_WARNING << "TCP connection lost";
  293. break;
  294. }
  295. }
  296. if (len == 0)
  297. break; // clean close
  298. lock.unlock(); // unlock now since the upper layer might send on incoming
  299. auto *b = reinterpret_cast<byte *>(buffer);
  300. incoming(make_message(b, b + len));
  301. }
  302. }
  303. } catch (const std::exception &e) {
  304. PLOG_ERROR << "TCP recv: " << e.what();
  305. }
  306. PLOG_INFO << "TCP disconnected";
  307. changeState(State::Disconnected);
  308. recv(nullptr);
  309. }
  310. } // namespace rtc::impl
  311. #endif