tcptransport.cpp 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. /**
  2. * Copyright (c) 2020 Paul-Louis Ageneau
  3. *
  4. * This library is free software; you can redistribute it and/or
  5. * modify it under the terms of the GNU Lesser General Public
  6. * License as published by the Free Software Foundation; either
  7. * version 2.1 of the License, or (at your option) any later version.
  8. *
  9. * This library is distributed in the hope that it will be useful,
  10. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  12. * Lesser General Public License for more details.
  13. *
  14. * You should have received a copy of the GNU Lesser General Public
  15. * License along with this library; if not, write to the Free Software
  16. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  17. */
  18. #if RTC_ENABLE_WEBSOCKET
  19. #include "tcptransport.hpp"
  20. #include <exception>
  21. #ifndef _WIN32
  22. #include <fcntl.h>
  23. #include <unistd.h>
  24. #endif
  25. namespace rtc {
  26. using std::to_string;
  27. SelectInterrupter::SelectInterrupter() {
  28. #ifndef _WIN32
  29. int pipefd[2];
  30. if (::pipe(pipefd) != 0)
  31. throw std::runtime_error("Failed to create pipe");
  32. ::fcntl(pipefd[0], F_SETFL, O_NONBLOCK);
  33. ::fcntl(pipefd[1], F_SETFL, O_NONBLOCK);
  34. mPipeOut = pipefd[0]; // read
  35. mPipeIn = pipefd[1]; // write
  36. #endif
  37. }
  38. SelectInterrupter::~SelectInterrupter() {
  39. std::lock_guard lock(mMutex);
  40. #ifdef _WIN32
  41. if (mDummySock != INVALID_SOCKET)
  42. ::closesocket(mDummySock);
  43. #else
  44. ::close(mPipeIn);
  45. ::close(mPipeOut);
  46. #endif
  47. }
  48. int SelectInterrupter::prepare(fd_set &readfds, fd_set &writefds) {
  49. std::lock_guard lock(mMutex);
  50. #ifdef _WIN32
  51. if (mDummySock == INVALID_SOCKET)
  52. mDummySock = ::socket(AF_INET, SOCK_DGRAM, 0);
  53. FD_SET(mDummySock, &readfds);
  54. return SOCK_TO_INT(mDummySock) + 1;
  55. #else
  56. int ret;
  57. do {
  58. char dummy;
  59. ret = ::read(mPipeIn, &dummy, 1);
  60. } while (ret > 0);
  61. FD_SET(mPipeIn, &readfds);
  62. return mPipeIn + 1;
  63. #endif
  64. }
  65. void SelectInterrupter::interrupt() {
  66. std::lock_guard lock(mMutex);
  67. #ifdef _WIN32
  68. if (mDummySock != INVALID_SOCKET) {
  69. ::closesocket(mDummySock);
  70. mDummySock = INVALID_SOCKET;
  71. }
  72. #else
  73. char dummy = 0;
  74. ::write(mPipeOut, &dummy, 1);
  75. #endif
  76. }
  77. TcpTransport::TcpTransport(const string &hostname, const string &service, state_callback callback)
  78. : Transport(nullptr, std::move(callback)), mHostname(hostname), mService(service) {
  79. PLOG_DEBUG << "Initializing TCP transport";
  80. mThread = std::thread(&TcpTransport::runLoop, this);
  81. }
  82. TcpTransport::~TcpTransport() {
  83. stop();
  84. }
  85. bool TcpTransport::stop() {
  86. if (!Transport::stop())
  87. return false;
  88. PLOG_DEBUG << "Waiting for TCP recv thread";
  89. close();
  90. mThread.join();
  91. return true;
  92. }
  93. bool TcpTransport::send(message_ptr message) {
  94. if (state() != State::Connected)
  95. return false;
  96. if (!message)
  97. return mSendQueue.empty();
  98. PLOG_VERBOSE << "Send size=" << (message ? message->size() : 0);
  99. return outgoing(message);
  100. }
  101. void TcpTransport::incoming(message_ptr message) {
  102. if (!message)
  103. return;
  104. PLOG_VERBOSE << "Incoming size=" << message->size();
  105. recv(message);
  106. }
  107. bool TcpTransport::outgoing(message_ptr message) {
  108. // If nothing is pending, try to send directly
  109. // It's safe because if the queue is empty, the thread is not sending
  110. if (mSendQueue.empty() && trySendMessage(message))
  111. return true;
  112. mSendQueue.push(message);
  113. interruptSelect(); // so the thread waits for writability
  114. return false;
  115. }
  116. void TcpTransport::connect(const string &hostname, const string &service) {
  117. PLOG_DEBUG << "Connecting to " << hostname << ":" << service;
  118. struct addrinfo hints = {};
  119. hints.ai_family = AF_UNSPEC;
  120. hints.ai_socktype = SOCK_STREAM;
  121. hints.ai_protocol = IPPROTO_TCP;
  122. hints.ai_flags = AI_ADDRCONFIG;
  123. struct addrinfo *result = nullptr;
  124. if (getaddrinfo(hostname.c_str(), service.c_str(), &hints, &result))
  125. throw std::runtime_error("Resolution failed for \"" + hostname + ":" + service + "\"");
  126. for (auto p = result; p; p = p->ai_next) {
  127. try {
  128. connect(p->ai_addr, p->ai_addrlen);
  129. PLOG_INFO << "Connected to " << hostname << ":" << service;
  130. freeaddrinfo(result);
  131. return;
  132. } catch (const std::runtime_error &e) {
  133. if (p->ai_next) {
  134. PLOG_DEBUG << e.what();
  135. } else {
  136. PLOG_WARNING << e.what();
  137. }
  138. }
  139. }
  140. freeaddrinfo(result);
  141. std::ostringstream msg;
  142. msg << "Connection to " << hostname << ":" << service << " failed";
  143. throw std::runtime_error(msg.str());
  144. }
  145. void TcpTransport::connect(const sockaddr *addr, socklen_t addrlen) {
  146. try {
  147. char node[MAX_NUMERICNODE_LEN];
  148. char serv[MAX_NUMERICSERV_LEN];
  149. if (getnameinfo(addr, addrlen, node, MAX_NUMERICNODE_LEN, serv, MAX_NUMERICSERV_LEN,
  150. NI_NUMERICHOST | NI_NUMERICSERV) == 0) {
  151. PLOG_DEBUG << "Trying address " << node << ":" << serv;
  152. }
  153. PLOG_VERBOSE << "Creating TCP socket";
  154. // Create socket
  155. mSock = ::socket(addr->sa_family, SOCK_STREAM, IPPROTO_TCP);
  156. if (mSock == INVALID_SOCKET)
  157. throw std::runtime_error("TCP socket creation failed");
  158. ctl_t b = 1;
  159. if (::ioctlsocket(mSock, FIONBIO, &b) < 0)
  160. throw std::runtime_error("Failed to set socket non-blocking mode");
  161. #ifdef __APPLE__
  162. // MacOS lacks MSG_NOSIGNAL and requires SO_NOSIGPIPE instead
  163. int opt = 1;
  164. if (::setsockopt(mSock, SOL_SOCKET, SO_NOSIGPIPE, &opt, sizeof(opt)) < 0)
  165. throw std::runtime_error("Failed to disable SIGPIPE for socket");
  166. #endif
  167. // Initiate connection
  168. int ret = ::connect(mSock, addr, addrlen);
  169. if (ret < 0 && errno != EINPROGRESS) {
  170. std::ostringstream msg;
  171. msg << "TCP connection to " << node << ":" << serv << " failed, errno=" << sockerrno;
  172. throw std::runtime_error(msg.str());
  173. }
  174. fd_set writefds;
  175. FD_ZERO(&writefds);
  176. FD_SET(mSock, &writefds);
  177. struct timeval tv;
  178. tv.tv_sec = 10; // TODO
  179. tv.tv_usec = 0;
  180. ret = ::select(SOCKET_TO_INT(mSock) + 1, NULL, &writefds, NULL, &tv);
  181. if (ret < 0)
  182. throw std::runtime_error("Failed to wait for socket connection");
  183. if (ret == 0) {
  184. std::ostringstream msg;
  185. msg << "TCP connection to " << node << ":" << serv << " timed out";
  186. throw std::runtime_error(msg.str());
  187. }
  188. int error = 0;
  189. socklen_t errorlen = sizeof(error);
  190. if (::getsockopt(mSock, SOL_SOCKET, SO_ERROR, &error, &errorlen) != 0)
  191. throw std::runtime_error("Failed to get socket error code");
  192. if (error != 0) {
  193. std::ostringstream msg;
  194. msg << "TCP connection to " << node << ":" << serv << " failed, errno=" << error;
  195. throw std::runtime_error(msg.str());
  196. }
  197. PLOG_DEBUG << "TCP connection to " << node << ":" << serv << " succeeded";
  198. } catch (...) {
  199. if (mSock != INVALID_SOCKET) {
  200. ::closesocket(mSock);
  201. mSock = INVALID_SOCKET;
  202. }
  203. throw;
  204. }
  205. }
  206. void TcpTransport::close() {
  207. if (mSock != INVALID_SOCKET) {
  208. PLOG_DEBUG << "Closing TCP socket";
  209. ::closesocket(mSock);
  210. mSock = INVALID_SOCKET;
  211. }
  212. changeState(State::Disconnected);
  213. }
  214. bool TcpTransport::trySendQueue() {
  215. while (auto next = mSendQueue.peek()) {
  216. auto message = *next;
  217. if (!trySendMessage(message)) {
  218. mSendQueue.exchange(message);
  219. return false;
  220. }
  221. mSendQueue.pop();
  222. }
  223. return true;
  224. }
  225. bool TcpTransport::trySendMessage(message_ptr &message) {
  226. auto data = reinterpret_cast<const char *>(message->data());
  227. auto size = message->size();
  228. while (size) {
  229. #ifdef __APPLE__
  230. int flags = 0;
  231. #else
  232. int flags = MSG_NOSIGNAL;
  233. #endif
  234. int len = ::send(mSock, data, size, flags);
  235. if (len < 0) {
  236. if (errno == EAGAIN || errno == EWOULDBLOCK) {
  237. message = make_message(message->end() - size, message->end());
  238. return false;
  239. } else {
  240. throw std::runtime_error("Connection lost, errno=" + to_string(sockerrno));
  241. }
  242. }
  243. data += len;
  244. size -= len;
  245. }
  246. message = nullptr;
  247. return true;
  248. }
  249. void TcpTransport::runLoop() {
  250. const size_t bufferSize = 4096;
  251. // Connect
  252. try {
  253. changeState(State::Connecting);
  254. connect(mHostname, mService);
  255. } catch (const std::exception &e) {
  256. PLOG_ERROR << "TCP connect: " << e.what();
  257. changeState(State::Failed);
  258. return;
  259. }
  260. // Receive loop
  261. try {
  262. PLOG_INFO << "TCP connected";
  263. changeState(State::Connected);
  264. while (true) {
  265. fd_set readfds, writefds;
  266. int n = prepareSelect(readfds, writefds);
  267. struct timeval tv;
  268. tv.tv_sec = 10;
  269. tv.tv_usec = 0;
  270. int ret = ::select(n, &readfds, &writefds, NULL, &tv);
  271. if (ret < 0) {
  272. throw std::runtime_error("Failed to wait on socket");
  273. } else if (ret == 0) {
  274. PLOG_VERBOSE << "TCP is idle";
  275. incoming(make_message(0));
  276. continue;
  277. }
  278. if (FD_ISSET(mSock, &writefds))
  279. trySendQueue();
  280. if (FD_ISSET(mSock, &readfds)) {
  281. char buffer[bufferSize];
  282. int len = ::recv(mSock, buffer, bufferSize, 0);
  283. if (len < 0) {
  284. if (errno == EAGAIN || errno == EWOULDBLOCK) {
  285. continue;
  286. } else {
  287. throw std::runtime_error("Connection lost");
  288. }
  289. }
  290. if (len == 0)
  291. break; // clean close
  292. auto *b = reinterpret_cast<byte *>(buffer);
  293. incoming(make_message(b, b + len));
  294. }
  295. }
  296. } catch (const std::exception &e) {
  297. PLOG_ERROR << "TCP recv: " << e.what();
  298. }
  299. PLOG_INFO << "TCP disconnected";
  300. changeState(State::Disconnected);
  301. recv(nullptr);
  302. }
  303. int TcpTransport::prepareSelect(fd_set &readfds, fd_set &writefds) {
  304. FD_ZERO(&readfds);
  305. FD_ZERO(&writefds);
  306. FD_SET(mSock, &readfds);
  307. if (!mSendQueue.empty())
  308. FD_SET(mSock, &writefds);
  309. int n = SOCKET_TO_INT(mSock) + 1;
  310. int m = mInterrupter.prepare(readfds, writefds);
  311. return std::max(n, m);
  312. }
  313. void TcpTransport::interruptSelect() { mInterrupter.interrupt(); }
  314. } // namespace rtc
  315. #endif