tcptransport.cpp 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. /**
  2. * Copyright (c) 2020 Paul-Louis Ageneau
  3. *
  4. * This library is free software; you can redistribute it and/or
  5. * modify it under the terms of the GNU Lesser General Public
  6. * License as published by the Free Software Foundation; either
  7. * version 2.1 of the License, or (at your option) any later version.
  8. *
  9. * This library is distributed in the hope that it will be useful,
  10. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  12. * Lesser General Public License for more details.
  13. *
  14. * You should have received a copy of the GNU Lesser General Public
  15. * License along with this library; if not, write to the Free Software
  16. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  17. */
  18. #include "tcptransport.hpp"
  19. #if RTC_ENABLE_WEBSOCKET
  20. #include <exception>
  21. #ifndef _WIN32
  22. #include <fcntl.h>
  23. #include <unistd.h>
  24. #endif
  25. namespace rtc {
  26. using std::to_string;
  27. SelectInterrupter::SelectInterrupter() {
  28. #ifndef _WIN32
  29. int pipefd[2];
  30. if (::pipe(pipefd) != 0)
  31. throw std::runtime_error("Failed to create pipe");
  32. ::fcntl(pipefd[0], F_SETFL, O_NONBLOCK);
  33. ::fcntl(pipefd[1], F_SETFL, O_NONBLOCK);
  34. mPipeOut = pipefd[1]; // read
  35. mPipeIn = pipefd[0]; // write
  36. #endif
  37. }
  38. SelectInterrupter::~SelectInterrupter() {
  39. std::lock_guard lock(mMutex);
  40. #ifdef _WIN32
  41. if (mDummySock != INVALID_SOCKET)
  42. ::closesocket(mDummySock);
  43. #else
  44. ::close(mPipeIn);
  45. ::close(mPipeOut);
  46. #endif
  47. }
  48. int SelectInterrupter::prepare(fd_set &readfds, [[maybe_unused]] fd_set &writefds) {
  49. std::lock_guard lock(mMutex);
  50. #ifdef _WIN32
  51. if (mDummySock == INVALID_SOCKET)
  52. mDummySock = ::socket(AF_INET, SOCK_DGRAM, 0);
  53. FD_SET(mDummySock, &readfds);
  54. return SOCKET_TO_INT(mDummySock) + 1;
  55. #else
  56. char dummy;
  57. (void)::read(mPipeIn, &dummy, 1);
  58. FD_SET(mPipeIn, &readfds);
  59. return mPipeIn + 1;
  60. #endif
  61. }
  62. void SelectInterrupter::interrupt() {
  63. std::lock_guard lock(mMutex);
  64. #ifdef _WIN32
  65. if (mDummySock != INVALID_SOCKET) {
  66. ::closesocket(mDummySock);
  67. mDummySock = INVALID_SOCKET;
  68. }
  69. #else
  70. char dummy = 0;
  71. (void)::write(mPipeOut, &dummy, 1);
  72. #endif
  73. }
  74. TcpTransport::TcpTransport(const string &hostname, const string &service, state_callback callback)
  75. : Transport(nullptr, std::move(callback)), mHostname(hostname), mService(service) {
  76. PLOG_DEBUG << "Initializing TCP transport";
  77. }
  78. TcpTransport::~TcpTransport() { stop(); }
  79. void TcpTransport::start() {
  80. Transport::start();
  81. PLOG_DEBUG << "Starting TCP recv thread";
  82. mThread = std::thread(&TcpTransport::runLoop, this);
  83. }
  84. bool TcpTransport::stop() {
  85. if (!Transport::stop())
  86. return false;
  87. PLOG_DEBUG << "Waiting for TCP recv thread";
  88. close();
  89. mThread.join();
  90. return true;
  91. }
  92. bool TcpTransport::send(message_ptr message) {
  93. std::unique_lock lock(mSockMutex);
  94. if (state() != State::Connected)
  95. return false;
  96. if (!message)
  97. return trySendQueue();
  98. PLOG_VERBOSE << "Send size=" << (message ? message->size() : 0);
  99. return outgoing(message);
  100. }
  101. void TcpTransport::incoming(message_ptr message) {
  102. if (!message)
  103. return;
  104. PLOG_VERBOSE << "Incoming size=" << message->size();
  105. recv(message);
  106. }
  107. bool TcpTransport::outgoing(message_ptr message) {
  108. // mSockMutex must be locked
  109. // Flush the queue, and if nothing is pending, try to send directly
  110. if (trySendQueue() && trySendMessage(message))
  111. return true;
  112. mSendQueue.push(message);
  113. interruptSelect(); // so the thread waits for writability
  114. return false;
  115. }
  116. void TcpTransport::connect(const string &hostname, const string &service) {
  117. PLOG_DEBUG << "Connecting to " << hostname << ":" << service;
  118. struct addrinfo hints = {};
  119. hints.ai_family = AF_UNSPEC;
  120. hints.ai_socktype = SOCK_STREAM;
  121. hints.ai_protocol = IPPROTO_TCP;
  122. hints.ai_flags = AI_ADDRCONFIG;
  123. struct addrinfo *result = nullptr;
  124. if (getaddrinfo(hostname.c_str(), service.c_str(), &hints, &result))
  125. throw std::runtime_error("Resolution failed for \"" + hostname + ":" + service + "\"");
  126. for (auto p = result; p; p = p->ai_next) {
  127. try {
  128. connect(p->ai_addr, socklen_t(p->ai_addrlen));
  129. PLOG_INFO << "Connected to " << hostname << ":" << service;
  130. freeaddrinfo(result);
  131. return;
  132. } catch (const std::runtime_error &e) {
  133. if (p->ai_next) {
  134. PLOG_DEBUG << e.what();
  135. } else {
  136. PLOG_WARNING << e.what();
  137. }
  138. }
  139. }
  140. freeaddrinfo(result);
  141. std::ostringstream msg;
  142. msg << "Connection to " << hostname << ":" << service << " failed";
  143. throw std::runtime_error(msg.str());
  144. }
  145. void TcpTransport::connect(const sockaddr *addr, socklen_t addrlen) {
  146. std::unique_lock lock(mSockMutex);
  147. try {
  148. char node[MAX_NUMERICNODE_LEN];
  149. char serv[MAX_NUMERICSERV_LEN];
  150. if (getnameinfo(addr, addrlen, node, MAX_NUMERICNODE_LEN, serv, MAX_NUMERICSERV_LEN,
  151. NI_NUMERICHOST | NI_NUMERICSERV) == 0) {
  152. PLOG_DEBUG << "Trying address " << node << ":" << serv;
  153. }
  154. PLOG_VERBOSE << "Creating TCP socket";
  155. // Create socket
  156. mSock = ::socket(addr->sa_family, SOCK_STREAM, IPPROTO_TCP);
  157. if (mSock == INVALID_SOCKET)
  158. throw std::runtime_error("TCP socket creation failed");
  159. ctl_t b = 1;
  160. if (::ioctlsocket(mSock, FIONBIO, &b) < 0)
  161. throw std::runtime_error("Failed to set socket non-blocking mode");
  162. #ifdef __APPLE__
  163. // MacOS lacks MSG_NOSIGNAL and requires SO_NOSIGPIPE instead
  164. int opt = 1;
  165. if (::setsockopt(mSock, SOL_SOCKET, SO_NOSIGPIPE, &opt, sizeof(opt)) < 0)
  166. throw std::runtime_error("Failed to disable SIGPIPE for socket");
  167. #endif
  168. // Initiate connection
  169. int ret = ::connect(mSock, addr, addrlen);
  170. if (ret < 0 && sockerrno != SEINPROGRESS && sockerrno != SEWOULDBLOCK) {
  171. std::ostringstream msg;
  172. msg << "TCP connection to " << node << ":" << serv << " failed, errno=" << sockerrno;
  173. throw std::runtime_error(msg.str());
  174. }
  175. while (true) {
  176. fd_set writefds;
  177. FD_ZERO(&writefds);
  178. FD_SET(mSock, &writefds);
  179. struct timeval tv;
  180. tv.tv_sec = 10; // TODO: Make the timeout configurable
  181. tv.tv_usec = 0;
  182. ret = ::select(SOCKET_TO_INT(mSock) + 1, NULL, &writefds, NULL, &tv);
  183. if (ret < 0) {
  184. if (sockerrno == SEINTR || sockerrno == SEAGAIN) // interrupted
  185. continue;
  186. else
  187. throw std::runtime_error("Failed to wait for socket connection");
  188. }
  189. if (ret == 0) {
  190. std::ostringstream msg;
  191. msg << "TCP connection to " << node << ":" << serv << " timed out";
  192. throw std::runtime_error(msg.str());
  193. }
  194. int error = 0;
  195. socklen_t errorlen = sizeof(error);
  196. if (::getsockopt(mSock, SOL_SOCKET, SO_ERROR, (char *)&error, &errorlen) != 0)
  197. throw std::runtime_error("Failed to get socket error code");
  198. if (error != 0) {
  199. std::ostringstream msg;
  200. msg << "TCP connection to " << node << ":" << serv << " failed, errno=" << error;
  201. throw std::runtime_error(msg.str());
  202. }
  203. PLOG_DEBUG << "TCP connection to " << node << ":" << serv << " succeeded";
  204. break;
  205. }
  206. } catch (...) {
  207. if (mSock != INVALID_SOCKET) {
  208. ::closesocket(mSock);
  209. mSock = INVALID_SOCKET;
  210. }
  211. throw;
  212. }
  213. }
  214. void TcpTransport::close() {
  215. std::unique_lock lock(mSockMutex);
  216. if (mSock != INVALID_SOCKET) {
  217. PLOG_DEBUG << "Closing TCP socket";
  218. ::closesocket(mSock);
  219. mSock = INVALID_SOCKET;
  220. }
  221. changeState(State::Disconnected);
  222. interruptSelect();
  223. }
  224. bool TcpTransport::trySendQueue() {
  225. // mSockMutex must be locked
  226. while (auto next = mSendQueue.peek()) {
  227. message_ptr message = std::move(*next);
  228. if (!trySendMessage(message)) {
  229. mSendQueue.exchange(message);
  230. return false;
  231. }
  232. mSendQueue.pop();
  233. }
  234. return true;
  235. }
  236. bool TcpTransport::trySendMessage(message_ptr &message) {
  237. // mSockMutex must be locked
  238. auto data = reinterpret_cast<const char *>(message->data());
  239. auto size = message->size();
  240. while (size) {
  241. #if defined(__APPLE__) || defined(_WIN32)
  242. int flags = 0;
  243. #else
  244. int flags = MSG_NOSIGNAL;
  245. #endif
  246. int len = ::send(mSock, data, int(size), flags);
  247. if (len < 0) {
  248. if (sockerrno == SEAGAIN || sockerrno == SEWOULDBLOCK) {
  249. message = make_message(message->end() - size, message->end());
  250. return false;
  251. } else {
  252. throw std::runtime_error("Connection lost, errno=" + to_string(sockerrno));
  253. }
  254. }
  255. data += len;
  256. size -= len;
  257. }
  258. message = nullptr;
  259. return true;
  260. }
  261. void TcpTransport::runLoop() {
  262. const size_t bufferSize = 4096;
  263. // Connect
  264. try {
  265. changeState(State::Connecting);
  266. connect(mHostname, mService);
  267. } catch (const std::exception &e) {
  268. PLOG_ERROR << "TCP connect: " << e.what();
  269. changeState(State::Failed);
  270. return;
  271. }
  272. // Receive loop
  273. try {
  274. PLOG_INFO << "TCP connected";
  275. changeState(State::Connected);
  276. while (true) {
  277. std::unique_lock lock(mSockMutex);
  278. if (mSock == INVALID_SOCKET)
  279. break;
  280. fd_set readfds, writefds;
  281. int n = prepareSelect(readfds, writefds);
  282. struct timeval tv;
  283. tv.tv_sec = 10;
  284. tv.tv_usec = 0;
  285. lock.unlock();
  286. int ret = ::select(n, &readfds, &writefds, NULL, &tv);
  287. lock.lock();
  288. if (mSock == INVALID_SOCKET)
  289. break;
  290. if (ret < 0) {
  291. throw std::runtime_error("Failed to wait on socket");
  292. } else if (ret == 0) {
  293. PLOG_VERBOSE << "TCP is idle";
  294. lock.unlock(); // unlock now since the upper layer might send on incoming
  295. incoming(make_message(0));
  296. continue;
  297. }
  298. if (FD_ISSET(mSock, &writefds))
  299. trySendQueue();
  300. if (FD_ISSET(mSock, &readfds)) {
  301. char buffer[bufferSize];
  302. int len = ::recv(mSock, buffer, bufferSize, 0);
  303. if (len < 0) {
  304. if (sockerrno == SEAGAIN || sockerrno == SEWOULDBLOCK) {
  305. continue;
  306. } else {
  307. throw std::runtime_error("Connection lost");
  308. }
  309. }
  310. if (len == 0)
  311. break; // clean close
  312. lock.unlock(); // unlock now since the upper layer might send on incoming
  313. auto *b = reinterpret_cast<byte *>(buffer);
  314. incoming(make_message(b, b + len));
  315. }
  316. }
  317. } catch (const std::exception &e) {
  318. PLOG_ERROR << "TCP recv: " << e.what();
  319. }
  320. PLOG_INFO << "TCP disconnected";
  321. changeState(State::Disconnected);
  322. recv(nullptr);
  323. }
  324. int TcpTransport::prepareSelect(fd_set &readfds, fd_set &writefds) {
  325. FD_ZERO(&readfds);
  326. FD_ZERO(&writefds);
  327. FD_SET(mSock, &readfds);
  328. if (!mSendQueue.empty())
  329. FD_SET(mSock, &writefds);
  330. int n = SOCKET_TO_INT(mSock) + 1;
  331. int m = mInterrupter.prepare(readfds, writefds);
  332. return std::max(n, m);
  333. }
  334. void TcpTransport::interruptSelect() { mInterrupter.interrupt(); }
  335. } // namespace rtc
  336. #endif