sctptransport.cpp 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945
  1. /**
  2. * Copyright (c) 2019 Paul-Louis Ageneau
  3. *
  4. * This library is free software; you can redistribute it and/or
  5. * modify it under the terms of the GNU Lesser General Public
  6. * License as published by the Free Software Foundation; either
  7. * version 2.1 of the License, or (at your option) any later version.
  8. *
  9. * This library is distributed in the hope that it will be useful,
  10. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  12. * Lesser General Public License for more details.
  13. *
  14. * You should have received a copy of the GNU Lesser General Public
  15. * License along with this library; if not, write to the Free Software
  16. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  17. */
  18. #include "sctptransport.hpp"
  19. #include "dtlstransport.hpp"
  20. #include "internals.hpp"
  21. #include "logcounter.hpp"
  22. #include <chrono>
  23. #include <cstdarg>
  24. #include <cstdio>
  25. #include <exception>
  26. #include <iostream>
  27. #include <limits>
  28. #include <shared_mutex>
  29. #include <thread>
  30. #include <unordered_set>
  31. #include <vector>
  32. // RFC 8831: SCTP MUST support performing Path MTU discovery without relying on ICMP or ICMPv6 as
  33. // specified in [RFC4821] by using probing messages specified in [RFC4820].
  34. // See https://tools.ietf.org/html/rfc8831#section-5
  35. //
  36. // However, usrsctp does not implement Path MTU discovery, so we need to disable it for now.
  37. // See https://github.com/sctplab/usrsctp/issues/205
  38. #define USE_PMTUD 0
  39. // TODO: When Path MTU discovery is supported, it needs to be enabled with libjuice as ICE backend
  40. // on all platforms except Mac OS where the Don't Fragment (DF) flag can't be set:
  41. /*
  42. #if !USE_NICE
  43. #ifndef __APPLE__
  44. // libjuice enables Linux path MTU discovery or sets the DF flag
  45. #define USE_PMTUD 1
  46. #else
  47. // Setting the DF flag is not available on Mac OS
  48. #define USE_PMTUD 0
  49. #endif
  50. #else // USE_NICE == 1
  51. #define USE_PMTUD 0
  52. #endif
  53. */
  54. using namespace std::chrono_literals;
  55. using namespace std::chrono;
  56. namespace {
  57. template <typename T> uint16_t to_uint16(T i) {
  58. if (i >= 0 && static_cast<typename std::make_unsigned<T>::type>(i) <=
  59. std::numeric_limits<uint16_t>::max())
  60. return static_cast<uint16_t>(i);
  61. else
  62. throw std::invalid_argument("Integer out of range");
  63. }
  64. template <typename T> uint32_t to_uint32(T i) {
  65. if (i >= 0 && static_cast<typename std::make_unsigned<T>::type>(i) <=
  66. std::numeric_limits<uint32_t>::max())
  67. return static_cast<uint32_t>(i);
  68. else
  69. throw std::invalid_argument("Integer out of range");
  70. }
  71. } // namespace
  72. namespace rtc::impl {
  73. static LogCounter COUNTER_UNKNOWN_PPID(plog::warning,
  74. "Number of SCTP packets received with an unknown PPID");
  75. static LogCounter
  76. COUNTER_BAD_NOTIF_LEN(plog::warning,
  77. "Number of SCTP packets received with an bad notification length");
  78. static LogCounter COUNTER_BAD_SCTP_STATUS(plog::warning,
  79. "Number of SCTP packets received with a bad status");
  80. class SctpTransport::InstancesSet {
  81. public:
  82. void insert(SctpTransport *instance) {
  83. std::unique_lock lock(mMutex);
  84. mSet.insert(instance);
  85. }
  86. void erase(SctpTransport *instance) {
  87. std::unique_lock lock(mMutex);
  88. mSet.erase(instance);
  89. }
  90. using shared_lock = std::shared_lock<std::shared_mutex>;
  91. optional<shared_lock> lock(SctpTransport *instance) {
  92. shared_lock lock(mMutex);
  93. return mSet.find(instance) != mSet.end() ? std::make_optional(std::move(lock)) : nullopt;
  94. }
  95. private:
  96. std::unordered_set<SctpTransport *> mSet;
  97. std::shared_mutex mMutex;
  98. };
  99. SctpTransport::InstancesSet *SctpTransport::Instances = new InstancesSet;
  100. void SctpTransport::Init() {
  101. usrsctp_init(0, SctpTransport::WriteCallback, SctpTransport::DebugCallback);
  102. usrsctp_enable_crc32c_offload(); // We'll compute CRC32 only for outgoing packets
  103. usrsctp_sysctl_set_sctp_pr_enable(1); // Enable Partial Reliability Extension (RFC 3758)
  104. usrsctp_sysctl_set_sctp_ecn_enable(0); // Disable Explicit Congestion Notification
  105. #ifdef SCTP_DEBUG
  106. usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_ALL);
  107. #endif
  108. }
  109. void SctpTransport::SetSettings(const SctpSettings &s) {
  110. // The send and receive window size of usrsctp is 256KiB, which is too small for realistic RTTs,
  111. // therefore we increase it to 1MiB by default for better performance.
  112. // See https://bugzilla.mozilla.org/show_bug.cgi?id=1051685
  113. usrsctp_sysctl_set_sctp_recvspace(to_uint32(s.recvBufferSize.value_or(1024 * 1024)));
  114. usrsctp_sysctl_set_sctp_sendspace(to_uint32(s.sendBufferSize.value_or(1024 * 1024)));
  115. // Increase maximum chunks number on queue to 10K by default
  116. usrsctp_sysctl_set_sctp_max_chunks_on_queue(to_uint32(s.maxChunksOnQueue.value_or(10 * 1024)));
  117. // Increase initial congestion window size to 10 MTUs (RFC 6928) by default
  118. usrsctp_sysctl_set_sctp_initial_cwnd(to_uint32(s.initialCongestionWindow.value_or(10)));
  119. // Set max burst to 10 MTUs by default (max burst is initially 0, meaning disabled)
  120. usrsctp_sysctl_set_sctp_max_burst_default(to_uint32(s.maxBurst.value_or(10)));
  121. // Use standard SCTP congestion control (RFC 4960) by default
  122. // See https://github.com/paullouisageneau/libdatachannel/issues/354
  123. usrsctp_sysctl_set_sctp_default_cc_module(to_uint32(s.congestionControlModule.value_or(0)));
  124. // Reduce SACK delay to 20ms by default (the recommended default value from RFC 4960 is 200ms)
  125. usrsctp_sysctl_set_sctp_delayed_sack_time_default(
  126. to_uint32(s.delayedSackTime.value_or(20ms).count()));
  127. // RTO settings
  128. // RFC 2988 recommends a 1s min RTO, which is very high, but TCP on Linux has a 200ms min RTO
  129. usrsctp_sysctl_set_sctp_rto_min_default(
  130. to_uint32(s.minRetransmitTimeout.value_or(200ms).count()));
  131. // Set only 10s as max RTO instead of 60s for shorter connection timeout
  132. usrsctp_sysctl_set_sctp_rto_max_default(
  133. to_uint32(s.maxRetransmitTimeout.value_or(10000ms).count()));
  134. usrsctp_sysctl_set_sctp_init_rto_max_default(
  135. to_uint32(s.maxRetransmitTimeout.value_or(10000ms).count()));
  136. // Still set 1s as initial RTO
  137. usrsctp_sysctl_set_sctp_rto_initial_default(
  138. to_uint32(s.initialRetransmitTimeout.value_or(1000ms).count()));
  139. // RTX settings
  140. // 5 retransmissions instead of 8 to shorten the backoff for shorter connection timeout
  141. auto maxRtx = to_uint32(s.maxRetransmitAttempts.value_or(5));
  142. usrsctp_sysctl_set_sctp_init_rtx_max_default(maxRtx);
  143. usrsctp_sysctl_set_sctp_assoc_rtx_max_default(maxRtx);
  144. usrsctp_sysctl_set_sctp_path_rtx_max_default(maxRtx); // single path
  145. // Heartbeat interval
  146. usrsctp_sysctl_set_sctp_heartbeat_interval_default(
  147. to_uint32(s.heartbeatInterval.value_or(10000ms).count()));
  148. }
  149. void SctpTransport::Cleanup() {
  150. while (usrsctp_finish() != 0)
  151. std::this_thread::sleep_for(100ms);
  152. }
  153. SctpTransport::SctpTransport(shared_ptr<Transport> lower, const Configuration &config, Ports ports,
  154. message_callback recvCallback, amount_callback bufferedAmountCallback,
  155. state_callback stateChangeCallback)
  156. : Transport(lower, std::move(stateChangeCallback)), mPorts(std::move(ports)),
  157. mSendQueue(0, message_size_func), mBufferedAmountCallback(std::move(bufferedAmountCallback)) {
  158. onRecv(std::move(recvCallback));
  159. PLOG_DEBUG << "Initializing SCTP transport";
  160. usrsctp_register_address(this);
  161. Instances->insert(this);
  162. mSock = usrsctp_socket(AF_CONN, SOCK_STREAM, IPPROTO_SCTP, nullptr, nullptr, 0, nullptr);
  163. if (!mSock)
  164. throw std::runtime_error("Could not create SCTP socket, errno=" + std::to_string(errno));
  165. usrsctp_set_upcall(mSock, &SctpTransport::UpcallCallback, this);
  166. if (usrsctp_set_non_blocking(mSock, 1))
  167. throw std::runtime_error("Unable to set non-blocking mode, errno=" + std::to_string(errno));
  168. // SCTP must stop sending after the lower layer is shut down, so disable linger
  169. struct linger sol = {};
  170. sol.l_onoff = 1;
  171. sol.l_linger = 0;
  172. if (usrsctp_setsockopt(mSock, SOL_SOCKET, SO_LINGER, &sol, sizeof(sol)))
  173. throw std::runtime_error("Could not set socket option SO_LINGER, errno=" +
  174. std::to_string(errno));
  175. struct sctp_assoc_value av = {};
  176. av.assoc_id = SCTP_ALL_ASSOC;
  177. av.assoc_value = 1;
  178. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, &av, sizeof(av)))
  179. throw std::runtime_error("Could not set socket option SCTP_ENABLE_STREAM_RESET, errno=" +
  180. std::to_string(errno));
  181. int on = 1;
  182. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_RECVRCVINFO, &on, sizeof(on)))
  183. throw std::runtime_error("Could set socket option SCTP_RECVRCVINFO, errno=" +
  184. std::to_string(errno));
  185. struct sctp_event se = {};
  186. se.se_assoc_id = SCTP_ALL_ASSOC;
  187. se.se_on = 1;
  188. se.se_type = SCTP_ASSOC_CHANGE;
  189. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_EVENT, &se, sizeof(se)))
  190. throw std::runtime_error("Could not subscribe to event SCTP_ASSOC_CHANGE, errno=" +
  191. std::to_string(errno));
  192. se.se_type = SCTP_SENDER_DRY_EVENT;
  193. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_EVENT, &se, sizeof(se)))
  194. throw std::runtime_error("Could not subscribe to event SCTP_SENDER_DRY_EVENT, errno=" +
  195. std::to_string(errno));
  196. se.se_type = SCTP_STREAM_RESET_EVENT;
  197. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_EVENT, &se, sizeof(se)))
  198. throw std::runtime_error("Could not subscribe to event SCTP_STREAM_RESET_EVENT, errno=" +
  199. std::to_string(errno));
  200. // RFC 8831 6.6. Transferring User Data on a Data Channel
  201. // The sender SHOULD disable the Nagle algorithm (see [RFC1122) to minimize the latency
  202. // See https://tools.ietf.org/html/rfc8831#section-6.6
  203. int nodelay = 1;
  204. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_NODELAY, &nodelay, sizeof(nodelay)))
  205. throw std::runtime_error("Could not set socket option SCTP_NODELAY, errno=" +
  206. std::to_string(errno));
  207. struct sctp_paddrparams spp = {};
  208. // Enable SCTP heartbeats
  209. spp.spp_flags = SPP_HB_ENABLE;
  210. // RFC 8261 5. DTLS considerations:
  211. // If path MTU discovery is performed by the SCTP layer and IPv4 is used as the network-layer
  212. // protocol, the DTLS implementation SHOULD allow the DTLS user to enforce that the
  213. // corresponding IPv4 packet is sent with the Don't Fragment (DF) bit set. If controlling the DF
  214. // bit is not possible (for example, due to implementation restrictions), a safe value for the
  215. // path MTU has to be used by the SCTP stack. It is RECOMMENDED that the safe value not exceed
  216. // 1200 bytes.
  217. // See https://tools.ietf.org/html/rfc8261#section-5
  218. #if USE_PMTUD
  219. if (!config.mtu.has_value()) {
  220. #else
  221. if (false) {
  222. #endif
  223. // Enable SCTP path MTU discovery
  224. spp.spp_flags |= SPP_PMTUD_ENABLE;
  225. PLOG_VERBOSE << "Path MTU discovery enabled";
  226. } else {
  227. // Fall back to a safe MTU value.
  228. spp.spp_flags |= SPP_PMTUD_DISABLE;
  229. // The MTU value provided specifies the space available for chunks in the
  230. // packet, so we also subtract the SCTP header size.
  231. size_t pmtu = config.mtu.value_or(DEFAULT_MTU) - 12 - 48 - 8 - 40; // SCTP/DTLS/UDP/IPv6
  232. spp.spp_pathmtu = to_uint32(pmtu);
  233. PLOG_VERBOSE << "Path MTU discovery disabled, SCTP MTU set to " << pmtu;
  234. }
  235. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS, &spp, sizeof(spp)))
  236. throw std::runtime_error("Could not set socket option SCTP_PEER_ADDR_PARAMS, errno=" +
  237. std::to_string(errno));
  238. // RFC 8831 6.2. SCTP Association Management
  239. // The number of streams negotiated during SCTP association setup SHOULD be 65535, which is the
  240. // maximum number of streams that can be negotiated during the association setup.
  241. // See https://tools.ietf.org/html/rfc8831#section-6.2
  242. struct sctp_initmsg sinit = {};
  243. sinit.sinit_num_ostreams = 65535;
  244. sinit.sinit_max_instreams = 65535;
  245. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_INITMSG, &sinit, sizeof(sinit)))
  246. throw std::runtime_error("Could not set socket option SCTP_INITMSG, errno=" +
  247. std::to_string(errno));
  248. // Prevent fragmented interleave of messages (i.e. level 0), see RFC 6458 section 8.1.20.
  249. // Unless the user has set the fragmentation interleave level to 0, notifications
  250. // may also be interleaved with partially delivered messages.
  251. int level = 0;
  252. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_FRAGMENT_INTERLEAVE, &level, sizeof(level)))
  253. throw std::runtime_error("Could not disable SCTP fragmented interleave, errno=" +
  254. std::to_string(errno));
  255. int rcvBuf = 0;
  256. socklen_t rcvBufLen = sizeof(rcvBuf);
  257. if (usrsctp_getsockopt(mSock, SOL_SOCKET, SO_RCVBUF, &rcvBuf, &rcvBufLen))
  258. throw std::runtime_error("Could not get SCTP recv buffer size, errno=" +
  259. std::to_string(errno));
  260. int sndBuf = 0;
  261. socklen_t sndBufLen = sizeof(sndBuf);
  262. if (usrsctp_getsockopt(mSock, SOL_SOCKET, SO_SNDBUF, &sndBuf, &sndBufLen))
  263. throw std::runtime_error("Could not get SCTP send buffer size, errno=" +
  264. std::to_string(errno));
  265. // Ensure the buffer is also large enough to accomodate the largest messages
  266. const size_t maxMessageSize = config.maxMessageSize.value_or(DEFAULT_LOCAL_MAX_MESSAGE_SIZE);
  267. const int minBuf = int(std::min(maxMessageSize, size_t(std::numeric_limits<int>::max())));
  268. rcvBuf = std::max(rcvBuf, minBuf);
  269. sndBuf = std::max(sndBuf, minBuf);
  270. if (usrsctp_setsockopt(mSock, SOL_SOCKET, SO_RCVBUF, &rcvBuf, sizeof(rcvBuf)))
  271. throw std::runtime_error("Could not set SCTP recv buffer size, errno=" +
  272. std::to_string(errno));
  273. if (usrsctp_setsockopt(mSock, SOL_SOCKET, SO_SNDBUF, &sndBuf, sizeof(sndBuf)))
  274. throw std::runtime_error("Could not set SCTP send buffer size, errno=" +
  275. std::to_string(errno));
  276. }
  277. SctpTransport::~SctpTransport() {
  278. stop();
  279. close();
  280. usrsctp_deregister_address(this);
  281. Instances->erase(this);
  282. }
  283. void SctpTransport::start() {
  284. Transport::start();
  285. registerIncoming();
  286. connect();
  287. }
  288. bool SctpTransport::stop() {
  289. // Transport::stop() will unregister incoming() from the lower layer, therefore we need to make
  290. // sure the thread from lower layers is not blocked in incoming() by the WrittenOnce condition.
  291. mWrittenOnce = true;
  292. mWrittenCondition.notify_all();
  293. if (!Transport::stop())
  294. return false;
  295. mSendQueue.stop();
  296. flush();
  297. shutdown();
  298. return true;
  299. }
  300. void SctpTransport::close() {
  301. if (mSock) {
  302. mProcessor.join();
  303. usrsctp_close(mSock);
  304. mSock = nullptr;
  305. }
  306. }
  307. struct sockaddr_conn SctpTransport::getSockAddrConn(uint16_t port) {
  308. struct sockaddr_conn sconn = {};
  309. sconn.sconn_family = AF_CONN;
  310. sconn.sconn_port = htons(port);
  311. sconn.sconn_addr = this;
  312. #ifdef HAVE_SCONN_LEN
  313. sconn.sconn_len = sizeof(sconn);
  314. #endif
  315. return sconn;
  316. }
  317. void SctpTransport::connect() {
  318. if (!mSock)
  319. throw std::logic_error("Attempted SCTP connect with closed socket");
  320. PLOG_DEBUG << "SCTP connecting (local port=" << mPorts.local
  321. << ", remote port=" << mPorts.remote << ")";
  322. changeState(State::Connecting);
  323. auto local = getSockAddrConn(mPorts.local);
  324. if (usrsctp_bind(mSock, reinterpret_cast<struct sockaddr *>(&local), sizeof(local)))
  325. throw std::runtime_error("Could not bind usrsctp socket, errno=" + std::to_string(errno));
  326. // According to RFC 8841, both endpoints must initiate the SCTP association, in a
  327. // simultaneous-open manner, irrelevent to the SDP setup role.
  328. // See https://tools.ietf.org/html/rfc8841#section-9.3
  329. auto remote = getSockAddrConn(mPorts.remote);
  330. int ret = usrsctp_connect(mSock, reinterpret_cast<struct sockaddr *>(&remote), sizeof(remote));
  331. if (ret && errno != EINPROGRESS)
  332. throw std::runtime_error("Connection attempt failed, errno=" + std::to_string(errno));
  333. }
  334. void SctpTransport::shutdown() {
  335. if (!mSock)
  336. return;
  337. PLOG_DEBUG << "SCTP shutdown";
  338. if (usrsctp_shutdown(mSock, SHUT_RDWR) != 0 && errno != ENOTCONN) {
  339. PLOG_WARNING << "SCTP shutdown failed, errno=" << errno;
  340. }
  341. close();
  342. PLOG_INFO << "SCTP disconnected";
  343. changeState(State::Disconnected);
  344. mWrittenCondition.notify_all();
  345. }
  346. bool SctpTransport::send(message_ptr message) {
  347. std::lock_guard lock(mSendMutex);
  348. if (!message)
  349. return trySendQueue();
  350. PLOG_VERBOSE << "Send size=" << message->size();
  351. // Flush the queue, and if nothing is pending, try to send directly
  352. if (trySendQueue() && trySendMessage(message))
  353. return true;
  354. mSendQueue.push(message);
  355. updateBufferedAmount(to_uint16(message->stream), ptrdiff_t(message_size_func(message)));
  356. return false;
  357. }
  358. bool SctpTransport::flush() {
  359. try {
  360. std::lock_guard lock(mSendMutex);
  361. trySendQueue();
  362. return true;
  363. } catch (const std::exception &e) {
  364. PLOG_WARNING << "SCTP flush: " << e.what();
  365. return false;
  366. }
  367. }
  368. void SctpTransport::closeStream(unsigned int stream) {
  369. std::lock_guard lock(mSendMutex);
  370. // RFC 8831 6.7. Closing a Data Channel
  371. // Closing of a data channel MUST be signaled by resetting the corresponding outgoing streams
  372. // See https://tools.ietf.org/html/rfc8831#section-6.7
  373. mSendQueue.push(make_message(0, Message::Reset, to_uint16(stream)));
  374. // This method must not call the buffered callback synchronously
  375. mProcessor.enqueue(&SctpTransport::flush, this);
  376. }
  377. void SctpTransport::incoming(message_ptr message) {
  378. // There could be a race condition here where we receive the remote INIT before the local one is
  379. // sent, which would result in the connection being aborted. Therefore, we need to wait for data
  380. // to be sent on our side (i.e. the local INIT) before proceeding.
  381. if (!mWrittenOnce) { // test the atomic boolean is not set first to prevent a lock contention
  382. std::unique_lock lock(mWriteMutex);
  383. mWrittenCondition.wait(lock, [&]() { return mWrittenOnce.load(); });
  384. }
  385. if (!message) {
  386. PLOG_INFO << "SCTP disconnected";
  387. changeState(State::Disconnected);
  388. recv(nullptr);
  389. return;
  390. }
  391. PLOG_VERBOSE << "Incoming size=" << message->size();
  392. usrsctp_conninput(this, message->data(), message->size(), 0);
  393. }
  394. bool SctpTransport::outgoing(message_ptr message) {
  395. // Set recommended medium-priority DSCP value
  396. // See https://datatracker.ietf.org/doc/html/rfc8837#section-5
  397. message->dscp = 10; // AF11: Assured Forwarding class 1, low drop probability
  398. return Transport::outgoing(std::move(message));
  399. }
  400. void SctpTransport::doRecv() {
  401. std::lock_guard lock(mRecvMutex);
  402. --mPendingRecvCount;
  403. try {
  404. while (state() != State::Disconnected && state() != State::Failed) {
  405. const size_t bufferSize = 65536;
  406. byte buffer[bufferSize];
  407. socklen_t fromlen = 0;
  408. struct sctp_rcvinfo info = {};
  409. socklen_t infolen = sizeof(info);
  410. unsigned int infotype = 0;
  411. int flags = 0;
  412. ssize_t len = usrsctp_recvv(mSock, buffer, bufferSize, nullptr, &fromlen, &info,
  413. &infolen, &infotype, &flags);
  414. if (len < 0) {
  415. if (errno == EWOULDBLOCK || errno == EAGAIN || errno == ECONNRESET)
  416. break;
  417. else
  418. throw std::runtime_error("SCTP recv failed, errno=" + std::to_string(errno));
  419. }
  420. PLOG_VERBOSE << "SCTP recv, len=" << len;
  421. // SCTP_FRAGMENT_INTERLEAVE does not seem to work as expected for messages > 64KB,
  422. // therefore partial notifications and messages need to be handled separately.
  423. if (flags & MSG_NOTIFICATION) {
  424. // SCTP event notification
  425. mPartialNotification.insert(mPartialNotification.end(), buffer, buffer + len);
  426. if (flags & MSG_EOR) {
  427. // Notification is complete, process it
  428. auto notification =
  429. reinterpret_cast<union sctp_notification *>(mPartialNotification.data());
  430. processNotification(notification, mPartialNotification.size());
  431. mPartialNotification.clear();
  432. }
  433. } else {
  434. // SCTP message
  435. mPartialMessage.insert(mPartialMessage.end(), buffer, buffer + len);
  436. if (flags & MSG_EOR) {
  437. // Message is complete, process it
  438. if (infotype != SCTP_RECVV_RCVINFO)
  439. throw std::runtime_error("Missing SCTP recv info");
  440. processData(std::move(mPartialMessage), info.rcv_sid,
  441. PayloadId(ntohl(info.rcv_ppid)));
  442. mPartialMessage.clear();
  443. }
  444. }
  445. }
  446. } catch (const std::exception &e) {
  447. PLOG_WARNING << e.what();
  448. }
  449. }
  450. void SctpTransport::doFlush() {
  451. std::lock_guard lock(mSendMutex);
  452. --mPendingFlushCount;
  453. try {
  454. trySendQueue();
  455. } catch (const std::exception &e) {
  456. PLOG_WARNING << e.what();
  457. }
  458. }
  459. bool SctpTransport::trySendQueue() {
  460. // Requires mSendMutex to be locked
  461. while (auto next = mSendQueue.peek()) {
  462. message_ptr message = std::move(*next);
  463. if (!trySendMessage(message))
  464. return false;
  465. mSendQueue.pop();
  466. updateBufferedAmount(to_uint16(message->stream), -ptrdiff_t(message_size_func(message)));
  467. }
  468. return true;
  469. }
  470. bool SctpTransport::trySendMessage(message_ptr message) {
  471. // Requires mSendMutex to be locked
  472. if (!mSock || state() != State::Connected)
  473. return false;
  474. uint32_t ppid;
  475. switch (message->type) {
  476. case Message::String:
  477. ppid = !message->empty() ? PPID_STRING : PPID_STRING_EMPTY;
  478. break;
  479. case Message::Binary:
  480. ppid = !message->empty() ? PPID_BINARY : PPID_BINARY_EMPTY;
  481. break;
  482. case Message::Control:
  483. ppid = PPID_CONTROL;
  484. break;
  485. case Message::Reset:
  486. sendReset(uint16_t(message->stream));
  487. return true;
  488. default:
  489. // Ignore
  490. return true;
  491. }
  492. PLOG_VERBOSE << "SCTP try send size=" << message->size();
  493. // TODO: Implement SCTP ndata specification draft when supported everywhere
  494. // See https://tools.ietf.org/html/draft-ietf-tsvwg-sctp-ndata-08
  495. const Reliability reliability = message->reliability ? *message->reliability : Reliability();
  496. struct sctp_sendv_spa spa = {};
  497. // set sndinfo
  498. spa.sendv_flags |= SCTP_SEND_SNDINFO_VALID;
  499. spa.sendv_sndinfo.snd_sid = uint16_t(message->stream);
  500. spa.sendv_sndinfo.snd_ppid = htonl(ppid);
  501. spa.sendv_sndinfo.snd_flags |= SCTP_EOR; // implicit here
  502. // set prinfo
  503. spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
  504. if (reliability.unordered)
  505. spa.sendv_sndinfo.snd_flags |= SCTP_UNORDERED;
  506. switch (reliability.type) {
  507. case Reliability::Type::Rexmit:
  508. spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
  509. spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_RTX;
  510. spa.sendv_prinfo.pr_value = to_uint32(std::get<int>(reliability.rexmit));
  511. break;
  512. case Reliability::Type::Timed:
  513. spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
  514. spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_TTL;
  515. spa.sendv_prinfo.pr_value = to_uint32(std::get<milliseconds>(reliability.rexmit).count());
  516. break;
  517. default:
  518. spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_NONE;
  519. break;
  520. }
  521. ssize_t ret;
  522. if (!message->empty()) {
  523. ret = usrsctp_sendv(mSock, message->data(), message->size(), nullptr, 0, &spa, sizeof(spa),
  524. SCTP_SENDV_SPA, 0);
  525. } else {
  526. const char zero = 0;
  527. ret = usrsctp_sendv(mSock, &zero, 1, nullptr, 0, &spa, sizeof(spa), SCTP_SENDV_SPA, 0);
  528. }
  529. if (ret < 0) {
  530. if (errno == EWOULDBLOCK || errno == EAGAIN) {
  531. PLOG_VERBOSE << "SCTP sending not possible";
  532. return false;
  533. }
  534. PLOG_ERROR << "SCTP sending failed, errno=" << errno;
  535. throw std::runtime_error("Sending failed, errno=" + std::to_string(errno));
  536. }
  537. PLOG_VERBOSE << "SCTP sent size=" << message->size();
  538. if (message->type == Message::Binary || message->type == Message::String)
  539. mBytesSent += message->size();
  540. return true;
  541. }
  542. void SctpTransport::updateBufferedAmount(uint16_t streamId, ptrdiff_t delta) {
  543. // Requires mSendMutex to be locked
  544. if (delta == 0)
  545. return;
  546. auto it = mBufferedAmount.insert(std::make_pair(streamId, 0)).first;
  547. size_t amount = size_t(std::max(ptrdiff_t(it->second) + delta, ptrdiff_t(0)));
  548. if (amount == 0)
  549. mBufferedAmount.erase(it);
  550. else
  551. it->second = amount;
  552. // Synchronously call the buffered amount callback
  553. triggerBufferedAmount(streamId, amount);
  554. }
  555. void SctpTransport::triggerBufferedAmount(uint16_t streamId, size_t amount) {
  556. try {
  557. mBufferedAmountCallback(streamId, amount);
  558. } catch (const std::exception &e) {
  559. PLOG_WARNING << "SCTP buffered amount callback: " << e.what();
  560. }
  561. }
  562. void SctpTransport::sendReset(uint16_t streamId) {
  563. // Requires mSendMutex to be locked
  564. if (!mSock || state() != State::Connected)
  565. return;
  566. PLOG_DEBUG << "SCTP resetting stream " << streamId;
  567. using srs_t = struct sctp_reset_streams;
  568. const size_t len = sizeof(srs_t) + sizeof(uint16_t);
  569. byte buffer[len] = {};
  570. srs_t &srs = *reinterpret_cast<srs_t *>(buffer);
  571. srs.srs_flags = SCTP_STREAM_RESET_OUTGOING;
  572. srs.srs_number_streams = 1;
  573. srs.srs_stream_list[0] = streamId;
  574. mWritten = false;
  575. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_RESET_STREAMS, &srs, len) == 0) {
  576. std::unique_lock lock(mWriteMutex); // locking before setsockopt might deadlock usrsctp...
  577. mWrittenCondition.wait_for(lock, 1000ms,
  578. [&]() { return mWritten || state() != State::Connected; });
  579. } else if (errno == EINVAL) {
  580. PLOG_DEBUG << "SCTP stream " << streamId << " already reset";
  581. } else {
  582. PLOG_WARNING << "SCTP reset stream " << streamId << " failed, errno=" << errno;
  583. }
  584. }
  585. void SctpTransport::handleUpcall() {
  586. if (!mSock)
  587. return;
  588. PLOG_VERBOSE << "Handle upcall";
  589. int events = usrsctp_get_events(mSock);
  590. if (events & SCTP_EVENT_READ && mPendingRecvCount == 0) {
  591. ++mPendingRecvCount;
  592. mProcessor.enqueue(&SctpTransport::doRecv, this);
  593. }
  594. if (events & SCTP_EVENT_WRITE && mPendingFlushCount == 0) {
  595. ++mPendingFlushCount;
  596. mProcessor.enqueue(&SctpTransport::doFlush, this);
  597. }
  598. }
  599. int SctpTransport::handleWrite(byte *data, size_t len, uint8_t /*tos*/, uint8_t /*set_df*/) {
  600. try {
  601. std::unique_lock lock(mWriteMutex);
  602. PLOG_VERBOSE << "Handle write, len=" << len;
  603. if (!outgoing(make_message(data, data + len)))
  604. return -1;
  605. mWritten = true;
  606. mWrittenOnce = true;
  607. mWrittenCondition.notify_all();
  608. } catch (const std::exception &e) {
  609. PLOG_ERROR << "SCTP write: " << e.what();
  610. return -1;
  611. }
  612. return 0; // success
  613. }
  614. void SctpTransport::processData(binary &&data, uint16_t sid, PayloadId ppid) {
  615. PLOG_VERBOSE << "Process data, size=" << data.size();
  616. // RFC 8831: The usage of the PPIDs "WebRTC String Partial" and "WebRTC Binary Partial" is
  617. // deprecated. They were used for a PPID-based fragmentation and reassembly of user messages
  618. // belonging to reliable and ordered data channels.
  619. // See https://tools.ietf.org/html/rfc8831#section-6.6
  620. // We handle those PPIDs at reception for compatibility reasons but shall never send them.
  621. switch (ppid) {
  622. case PPID_CONTROL:
  623. recv(make_message(std::move(data), Message::Control, sid));
  624. break;
  625. case PPID_STRING_PARTIAL: // deprecated
  626. mPartialStringData.insert(mPartialStringData.end(), data.begin(), data.end());
  627. break;
  628. case PPID_STRING:
  629. if (mPartialStringData.empty()) {
  630. mBytesReceived += data.size();
  631. recv(make_message(std::move(data), Message::String, sid));
  632. } else {
  633. mPartialStringData.insert(mPartialStringData.end(), data.begin(), data.end());
  634. mBytesReceived += mPartialStringData.size();
  635. recv(make_message(std::move(mPartialStringData), Message::String, sid));
  636. mPartialStringData.clear();
  637. }
  638. break;
  639. case PPID_STRING_EMPTY:
  640. recv(make_message(std::move(mPartialStringData), Message::String, sid));
  641. mPartialStringData.clear();
  642. break;
  643. case PPID_BINARY_PARTIAL: // deprecated
  644. mPartialBinaryData.insert(mPartialBinaryData.end(), data.begin(), data.end());
  645. break;
  646. case PPID_BINARY:
  647. if (mPartialBinaryData.empty()) {
  648. mBytesReceived += data.size();
  649. recv(make_message(std::move(data), Message::Binary, sid));
  650. } else {
  651. mPartialBinaryData.insert(mPartialBinaryData.end(), data.begin(), data.end());
  652. mBytesReceived += mPartialBinaryData.size();
  653. recv(make_message(std::move(mPartialBinaryData), Message::Binary, sid));
  654. mPartialBinaryData.clear();
  655. }
  656. break;
  657. case PPID_BINARY_EMPTY:
  658. recv(make_message(std::move(mPartialBinaryData), Message::Binary, sid));
  659. mPartialBinaryData.clear();
  660. break;
  661. default:
  662. // Unknown
  663. COUNTER_UNKNOWN_PPID++;
  664. PLOG_VERBOSE << "Unknown PPID: " << uint32_t(ppid);
  665. return;
  666. }
  667. }
  668. void SctpTransport::processNotification(const union sctp_notification *notify, size_t len) {
  669. if (len != size_t(notify->sn_header.sn_length)) {
  670. COUNTER_BAD_NOTIF_LEN++;
  671. return;
  672. }
  673. auto type = notify->sn_header.sn_type;
  674. PLOG_VERBOSE << "Processing notification, type=" << type;
  675. switch (type) {
  676. case SCTP_ASSOC_CHANGE: {
  677. const struct sctp_assoc_change &assoc_change = notify->sn_assoc_change;
  678. if (assoc_change.sac_state == SCTP_COMM_UP) {
  679. PLOG_INFO << "SCTP connected";
  680. changeState(State::Connected);
  681. } else {
  682. if (state() == State::Connecting) {
  683. PLOG_ERROR << "SCTP connection failed";
  684. changeState(State::Failed);
  685. } else {
  686. PLOG_INFO << "SCTP disconnected";
  687. changeState(State::Disconnected);
  688. }
  689. mWrittenCondition.notify_all();
  690. }
  691. break;
  692. }
  693. case SCTP_SENDER_DRY_EVENT: {
  694. PLOG_VERBOSE << "SCTP dry event";
  695. // It should not be necessary since the send callback should have been called already,
  696. // but to be sure, let's try to send now.
  697. flush();
  698. break;
  699. }
  700. case SCTP_STREAM_RESET_EVENT: {
  701. const struct sctp_stream_reset_event &reset_event = notify->sn_strreset_event;
  702. const int count = (reset_event.strreset_length - sizeof(reset_event)) / sizeof(uint16_t);
  703. const uint16_t flags = reset_event.strreset_flags;
  704. IF_PLOG(plog::verbose) {
  705. std::ostringstream desc;
  706. desc << "flags=";
  707. if (flags & SCTP_STREAM_RESET_OUTGOING_SSN && flags & SCTP_STREAM_RESET_INCOMING_SSN)
  708. desc << "outgoing|incoming";
  709. else if (flags & SCTP_STREAM_RESET_OUTGOING_SSN)
  710. desc << "outgoing";
  711. else if (flags & SCTP_STREAM_RESET_INCOMING_SSN)
  712. desc << "incoming";
  713. else
  714. desc << "0";
  715. desc << ", streams=[";
  716. for (int i = 0; i < count; ++i) {
  717. uint16_t streamId = reset_event.strreset_stream_list[i];
  718. desc << (i != 0 ? "," : "") << streamId;
  719. }
  720. desc << "]";
  721. PLOG_VERBOSE << "SCTP reset event, " << desc.str();
  722. }
  723. // RFC 8831 6.7. Closing a Data Channel
  724. // If one side decides to close the data channel, it resets the corresponding outgoing
  725. // stream. When the peer sees that an incoming stream was reset, it also resets its
  726. // corresponding outgoing stream.
  727. // See https://tools.ietf.org/html/rfc8831#section-6.7
  728. if (flags & SCTP_STREAM_RESET_INCOMING_SSN) {
  729. const byte dataChannelCloseMessage{0x04};
  730. for (int i = 0; i < count; ++i) {
  731. uint16_t streamId = reset_event.strreset_stream_list[i];
  732. recv(make_message(&dataChannelCloseMessage, &dataChannelCloseMessage + 1,
  733. Message::Control, streamId));
  734. }
  735. }
  736. break;
  737. }
  738. default:
  739. // Ignore
  740. break;
  741. }
  742. }
  743. void SctpTransport::clearStats() {
  744. mBytesReceived = 0;
  745. mBytesSent = 0;
  746. }
  747. size_t SctpTransport::bytesSent() { return mBytesSent; }
  748. size_t SctpTransport::bytesReceived() { return mBytesReceived; }
  749. optional<milliseconds> SctpTransport::rtt() {
  750. if (!mSock || state() != State::Connected)
  751. return nullopt;
  752. struct sctp_status status = {};
  753. socklen_t len = sizeof(status);
  754. if (usrsctp_getsockopt(mSock, IPPROTO_SCTP, SCTP_STATUS, &status, &len)) {
  755. COUNTER_BAD_SCTP_STATUS++;
  756. return nullopt;
  757. }
  758. return milliseconds(status.sstat_primary.spinfo_srtt);
  759. }
  760. void SctpTransport::UpcallCallback(struct socket *, void *arg, int /* flags */) {
  761. auto *transport = static_cast<SctpTransport *>(arg);
  762. if (auto locked = Instances->lock(transport))
  763. transport->handleUpcall();
  764. }
  765. int SctpTransport::WriteCallback(void *ptr, void *data, size_t len, uint8_t tos, uint8_t set_df) {
  766. auto *transport = static_cast<SctpTransport *>(ptr);
  767. // Set the CRC32 ourselves as we have enabled CRC32 offloading
  768. if (len >= 12) {
  769. uint32_t *checksum = reinterpret_cast<uint32_t *>(data) + 2;
  770. *checksum = 0;
  771. *checksum = usrsctp_crc32c(data, len);
  772. }
  773. // Workaround for sctplab/usrsctp#405: Send callback is invoked on already closed socket
  774. // https://github.com/sctplab/usrsctp/issues/405
  775. if (auto locked = Instances->lock(transport))
  776. return transport->handleWrite(static_cast<byte *>(data), len, tos, set_df);
  777. else
  778. return -1;
  779. }
  780. void SctpTransport::DebugCallback(const char *format, ...) {
  781. const size_t bufferSize = 1024;
  782. char buffer[bufferSize];
  783. va_list va;
  784. va_start(va, format);
  785. int len = std::vsnprintf(buffer, bufferSize, format, va);
  786. va_end(va);
  787. if (len <= 0)
  788. return;
  789. len = std::min(len, int(bufferSize - 1));
  790. buffer[len - 1] = '\0'; // remove newline
  791. PLOG_VERBOSE << "usrsctp: " << buffer; // usrsctp debug as verbose
  792. }
  793. } // namespace rtc::impl