pollservice.cpp 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. /**
  2. * Copyright (c) 2022 Paul-Louis Ageneau
  3. *
  4. * This Source Code Form is subject to the terms of the Mozilla Public
  5. * License, v. 2.0. If a copy of the MPL was not distributed with this
  6. * file, You can obtain one at https://mozilla.org/MPL/2.0/.
  7. */
  8. #include "pollservice.hpp"
  9. #include "internals.hpp"
  10. #if RTC_ENABLE_WEBSOCKET
  11. #include <algorithm>
  12. #include <cassert>
  13. namespace rtc::impl {
  14. using namespace std::chrono_literals;
  15. using std::chrono::duration_cast;
  16. using std::chrono::milliseconds;
  17. PollService &PollService::Instance() {
  18. static PollService *instance = new PollService;
  19. return *instance;
  20. }
  21. PollService::PollService() : mStopped(true) {}
  22. PollService::~PollService() {}
  23. void PollService::start() {
  24. mSocks = std::make_unique<SocketMap>();
  25. mInterrupter = std::make_unique<PollInterrupter>();
  26. mStopped = false;
  27. mThread = std::thread(&PollService::runLoop, this);
  28. }
  29. void PollService::join() {
  30. std::unique_lock lock(mMutex);
  31. if (std::exchange(mStopped, true))
  32. return;
  33. lock.unlock();
  34. mInterrupter->interrupt();
  35. mThread.join();
  36. mSocks.reset();
  37. mInterrupter.reset();
  38. }
  39. void PollService::add(socket_t sock, Params params) {
  40. assert(sock != INVALID_SOCKET);
  41. assert(params.callback);
  42. std::unique_lock lock(mMutex);
  43. PLOG_VERBOSE << "Registering socket in poll service, direction=" << params.direction;
  44. auto until = params.timeout ? std::make_optional(clock::now() + *params.timeout) : nullopt;
  45. assert(mSocks);
  46. mSocks->insert_or_assign(sock, SocketEntry{std::move(params), std::move(until)});
  47. assert(mInterrupter);
  48. mInterrupter->interrupt();
  49. }
  50. void PollService::remove(socket_t sock) {
  51. assert(sock != INVALID_SOCKET);
  52. std::unique_lock lock(mMutex);
  53. PLOG_VERBOSE << "Unregistering socket in poll service";
  54. assert(mSocks);
  55. mSocks->erase(sock);
  56. assert(mInterrupter);
  57. mInterrupter->interrupt();
  58. }
  59. void PollService::prepare(std::vector<struct pollfd> &pfds, optional<clock::time_point> &next) {
  60. std::unique_lock lock(mMutex);
  61. pfds.resize(1 + mSocks->size());
  62. next.reset();
  63. auto it = pfds.begin();
  64. mInterrupter->prepare(*it++);
  65. for (const auto &[sock, entry] : *mSocks) {
  66. it->fd = sock;
  67. switch (entry.params.direction) {
  68. case Direction::In:
  69. it->events = POLLIN;
  70. break;
  71. case Direction::Out:
  72. it->events = POLLOUT;
  73. break;
  74. default:
  75. it->events = POLLIN | POLLOUT;
  76. break;
  77. }
  78. if (entry.until)
  79. next = next ? std::min(*next, *entry.until) : *entry.until;
  80. ++it;
  81. }
  82. }
  83. void PollService::process(std::vector<struct pollfd> &pfds) {
  84. auto it = pfds.begin();
  85. if (it != pfds.end()) {
  86. std::unique_lock lock(mMutex);
  87. mInterrupter->process(*it++);
  88. }
  89. while (it != pfds.end()) {
  90. std::unique_lock lock(mMutex);
  91. socket_t sock = it->fd;
  92. auto jt = mSocks->find(sock);
  93. if (jt != mSocks->end()) {
  94. try {
  95. auto &entry = jt->second;
  96. const auto &params = entry.params;
  97. if (it->revents & POLLNVAL || it->revents & POLLERR) {
  98. PLOG_VERBOSE << "Poll error event";
  99. auto callback = std::move(params.callback);
  100. mSocks->erase(sock);
  101. callback(Event::Error);
  102. } else if (it->revents & POLLIN || it->revents & POLLOUT) {
  103. entry.until = params.timeout
  104. ? std::make_optional(clock::now() + *params.timeout)
  105. : nullopt;
  106. auto callback = params.callback;
  107. if (it->revents & POLLIN) {
  108. PLOG_VERBOSE << "Poll in event";
  109. callback(Event::In);
  110. }
  111. if (it->revents & POLLOUT) {
  112. PLOG_VERBOSE << "Poll out event";
  113. callback(Event::Out);
  114. }
  115. } else if (entry.until && clock::now() >= *entry.until) {
  116. PLOG_VERBOSE << "Poll timeout event";
  117. auto callback = std::move(params.callback);
  118. mSocks->erase(sock);
  119. callback(Event::Timeout);
  120. }
  121. } catch (const std::exception &e) {
  122. PLOG_WARNING << e.what();
  123. mSocks->erase(sock);
  124. }
  125. }
  126. ++it;
  127. }
  128. }
  129. void PollService::runLoop() {
  130. try {
  131. PLOG_DEBUG << "Poll service started";
  132. assert(mSocks);
  133. std::vector<struct pollfd> pfds;
  134. optional<clock::time_point> next;
  135. while (!mStopped) {
  136. prepare(pfds, next);
  137. int ret;
  138. do {
  139. int timeout;
  140. if (next) {
  141. auto msecs = duration_cast<milliseconds>(
  142. std::max(clock::duration::zero(), *next - clock::now() + 1ms));
  143. PLOG_VERBOSE << "Entering poll, timeout=" << msecs.count() << "ms";
  144. timeout = static_cast<int>(msecs.count());
  145. } else {
  146. PLOG_VERBOSE << "Entering poll";
  147. timeout = -1;
  148. }
  149. ret = ::poll(pfds.data(), static_cast<nfds_t>(pfds.size()), timeout);
  150. PLOG_VERBOSE << "Exiting poll";
  151. #ifdef _WIN32
  152. if (ret == WSAENOTSOCK)
  153. continue; // prepare again as the fd has been removed
  154. #endif
  155. } while (ret < 0 && (sockerrno == SEINTR || sockerrno == SEAGAIN));
  156. if (ret < 0)
  157. throw std::runtime_error("poll failed, errno=" + std::to_string(sockerrno));
  158. process(pfds);
  159. }
  160. } catch (const std::exception &e) {
  161. PLOG_FATAL << "Poll service failed: " << e.what();
  162. }
  163. PLOG_DEBUG << "Poll service stopped";
  164. }
  165. std::ostream &operator<<(std::ostream &out, PollService::Direction direction) {
  166. const char *str;
  167. switch (direction) {
  168. case PollService::Direction::In:
  169. str = "in";
  170. break;
  171. case PollService::Direction::Out:
  172. str = "out";
  173. break;
  174. case PollService::Direction::Both:
  175. str = "both";
  176. break;
  177. default:
  178. str = "unknown";
  179. break;
  180. }
  181. return out << str;
  182. }
  183. } // namespace rtc::impl
  184. #endif