pollservice.cpp 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. /**
  2. * Copyright (c) 2022 Paul-Louis Ageneau
  3. *
  4. * This Source Code Form is subject to the terms of the Mozilla Public
  5. * License, v. 2.0. If a copy of the MPL was not distributed with this
  6. * file, You can obtain one at https://mozilla.org/MPL/2.0/.
  7. */
  8. #include "pollservice.hpp"
  9. #include "internals.hpp"
  10. #include "utils.hpp"
  11. #if RTC_ENABLE_WEBSOCKET
  12. #include <algorithm>
  13. #include <cassert>
  14. namespace rtc::impl {
  15. using namespace std::chrono_literals;
  16. using std::chrono::duration_cast;
  17. using std::chrono::milliseconds;
  18. PollService &PollService::Instance() {
  19. static PollService *instance = new PollService;
  20. return *instance;
  21. }
  22. PollService::PollService() : mStopped(true) {}
  23. PollService::~PollService() {}
  24. void PollService::start() {
  25. mSocks = std::make_unique<SocketMap>();
  26. mInterrupter = std::make_unique<PollInterrupter>();
  27. mStopped = false;
  28. mThread = std::thread(&PollService::runLoop, this);
  29. }
  30. void PollService::join() {
  31. std::unique_lock lock(mMutex);
  32. if (std::exchange(mStopped, true))
  33. return;
  34. lock.unlock();
  35. mInterrupter->interrupt();
  36. mThread.join();
  37. mSocks.reset();
  38. mInterrupter.reset();
  39. }
  40. void PollService::add(socket_t sock, Params params) {
  41. assert(sock != INVALID_SOCKET);
  42. assert(params.callback);
  43. std::unique_lock lock(mMutex);
  44. PLOG_VERBOSE << "Registering socket in poll service, direction=" << params.direction;
  45. auto until = params.timeout ? std::make_optional(clock::now() + *params.timeout) : nullopt;
  46. assert(mSocks);
  47. mSocks->insert_or_assign(sock, SocketEntry{std::move(params), std::move(until)});
  48. assert(mInterrupter);
  49. mInterrupter->interrupt();
  50. }
  51. void PollService::remove(socket_t sock) {
  52. assert(sock != INVALID_SOCKET);
  53. std::unique_lock lock(mMutex);
  54. PLOG_VERBOSE << "Unregistering socket in poll service";
  55. assert(mSocks);
  56. mSocks->erase(sock);
  57. assert(mInterrupter);
  58. mInterrupter->interrupt();
  59. }
  60. void PollService::prepare(std::vector<struct pollfd> &pfds, optional<clock::time_point> &next) {
  61. std::unique_lock lock(mMutex);
  62. pfds.resize(1 + mSocks->size());
  63. next.reset();
  64. auto it = pfds.begin();
  65. mInterrupter->prepare(*it++);
  66. for (const auto &[sock, entry] : *mSocks) {
  67. it->fd = sock;
  68. switch (entry.params.direction) {
  69. case Direction::In:
  70. it->events = POLLIN;
  71. break;
  72. case Direction::Out:
  73. it->events = POLLOUT;
  74. break;
  75. default:
  76. it->events = POLLIN | POLLOUT;
  77. break;
  78. }
  79. if (entry.until)
  80. next = next ? std::min(*next, *entry.until) : *entry.until;
  81. ++it;
  82. }
  83. }
  84. void PollService::process(std::vector<struct pollfd> &pfds) {
  85. std::unique_lock lock(mMutex);
  86. auto it = pfds.begin();
  87. if (it != pfds.end()) {
  88. mInterrupter->process(*it++);
  89. }
  90. while (it != pfds.end()) {
  91. socket_t sock = it->fd;
  92. auto jt = mSocks->find(sock);
  93. if (jt != mSocks->end()) {
  94. try {
  95. auto &entry = jt->second;
  96. const auto &params = entry.params;
  97. if (it->revents & POLLNVAL || it->revents & POLLERR) {
  98. PLOG_VERBOSE << "Poll error event";
  99. auto callback = std::move(params.callback);
  100. mSocks->erase(sock);
  101. callback(Event::Error);
  102. } else if (it->revents & POLLIN || it->revents & POLLOUT || it->revents & POLLHUP) {
  103. entry.until = params.timeout
  104. ? std::make_optional(clock::now() + *params.timeout)
  105. : nullopt;
  106. auto callback = params.callback;
  107. if (it->revents & POLLIN ||
  108. it->revents & POLLHUP) { // Windows does not set POLLIN on close
  109. PLOG_VERBOSE << "Poll in event";
  110. callback(Event::In);
  111. }
  112. if (it->revents & POLLOUT) {
  113. PLOG_VERBOSE << "Poll out event";
  114. callback(Event::Out);
  115. }
  116. } else if (entry.until && clock::now() >= *entry.until) {
  117. PLOG_VERBOSE << "Poll timeout event";
  118. auto callback = std::move(params.callback);
  119. mSocks->erase(sock);
  120. callback(Event::Timeout);
  121. }
  122. } catch (const std::exception &e) {
  123. PLOG_WARNING << e.what();
  124. mSocks->erase(sock);
  125. }
  126. }
  127. ++it;
  128. }
  129. }
  130. void PollService::runLoop() {
  131. utils::this_thread::set_name("RTC poll");
  132. PLOG_DEBUG << "Poll service started";
  133. try {
  134. assert(mSocks);
  135. std::vector<struct pollfd> pfds;
  136. optional<clock::time_point> next;
  137. while (!mStopped) {
  138. prepare(pfds, next);
  139. int ret;
  140. do {
  141. int timeout;
  142. if (next) {
  143. auto msecs = duration_cast<milliseconds>(
  144. std::max(clock::duration::zero(), *next - clock::now() + 1ms));
  145. PLOG_VERBOSE << "Entering poll, timeout=" << msecs.count() << "ms";
  146. timeout = static_cast<int>(msecs.count());
  147. } else {
  148. PLOG_VERBOSE << "Entering poll";
  149. timeout = -1;
  150. }
  151. ret = ::poll(pfds.data(), static_cast<nfds_t>(pfds.size()), timeout);
  152. PLOG_VERBOSE << "Exiting poll";
  153. } while (ret < 0 && (sockerrno == SEINTR || sockerrno == SEAGAIN));
  154. #ifdef _WIN32
  155. if (ret == WSAENOTSOCK)
  156. continue; // prepare again as the fd has been removed
  157. #endif
  158. if (ret < 0)
  159. throw std::runtime_error("poll failed, errno=" + std::to_string(sockerrno));
  160. process(pfds);
  161. }
  162. } catch (const std::exception &e) {
  163. PLOG_FATAL << "Poll service failed: " << e.what();
  164. }
  165. PLOG_DEBUG << "Poll service stopped";
  166. }
  167. std::ostream &operator<<(std::ostream &out, PollService::Direction direction) {
  168. const char *str;
  169. switch (direction) {
  170. case PollService::Direction::In:
  171. str = "in";
  172. break;
  173. case PollService::Direction::Out:
  174. str = "out";
  175. break;
  176. case PollService::Direction::Both:
  177. str = "both";
  178. break;
  179. default:
  180. str = "unknown";
  181. break;
  182. }
  183. return out << str;
  184. }
  185. } // namespace rtc::impl
  186. #endif