sctptransport.cpp 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991
  1. /**
  2. * Copyright (c) 2019 Paul-Louis Ageneau
  3. *
  4. * This Source Code Form is subject to the terms of the Mozilla Public
  5. * License, v. 2.0. If a copy of the MPL was not distributed with this
  6. * file, You can obtain one at https://mozilla.org/MPL/2.0/.
  7. */
  8. #include "sctptransport.hpp"
  9. #include "dtlstransport.hpp"
  10. #include "internals.hpp"
  11. #include "logcounter.hpp"
  12. #include "utils.hpp"
  13. #include <algorithm>
  14. #include <chrono>
  15. #include <cstdarg>
  16. #include <cstdio>
  17. #include <exception>
  18. #include <iostream>
  19. #include <limits>
  20. #include <shared_mutex>
  21. #include <thread>
  22. #include <unordered_set>
  23. #include <vector>
  24. // RFC 8831: SCTP MUST support performing Path MTU discovery without relying on ICMP or ICMPv6 as
  25. // specified in [RFC4821] by using probing messages specified in [RFC4820].
  26. // See https://www.rfc-editor.org/rfc/rfc8831.html#section-5
  27. //
  28. // However, usrsctp does not implement Path MTU discovery, so we need to disable it for now.
  29. // See https://github.com/sctplab/usrsctp/issues/205
  30. #define USE_PMTUD 0
  31. // TODO: When Path MTU discovery is supported, it needs to be enabled with libjuice as ICE backend
  32. // on all platforms except Mac OS where the Don't Fragment (DF) flag can't be set:
  33. /*
  34. #if !USE_NICE
  35. #ifndef __APPLE__
  36. // libjuice enables Linux path MTU discovery or sets the DF flag
  37. #define USE_PMTUD 1
  38. #else
  39. // Setting the DF flag is not available on Mac OS
  40. #define USE_PMTUD 0
  41. #endif
  42. #else // USE_NICE == 1
  43. #define USE_PMTUD 0
  44. #endif
  45. */
  46. using namespace std::chrono_literals;
  47. using namespace std::chrono;
  48. namespace rtc::impl {
  49. using utils::to_uint16;
  50. using utils::to_uint32;
  51. static LogCounter COUNTER_UNKNOWN_PPID(plog::warning,
  52. "Number of SCTP packets received with an unknown PPID");
  53. class SctpTransport::InstancesSet {
  54. public:
  55. void insert(SctpTransport *instance) {
  56. std::unique_lock lock(mMutex);
  57. mSet.insert(instance);
  58. }
  59. void erase(SctpTransport *instance) {
  60. std::unique_lock lock(mMutex);
  61. mSet.erase(instance);
  62. }
  63. using shared_lock = std::shared_lock<std::shared_mutex>;
  64. optional<shared_lock> lock(SctpTransport *instance) noexcept {
  65. shared_lock lock(mMutex);
  66. return mSet.find(instance) != mSet.end() ? std::make_optional(std::move(lock)) : nullopt;
  67. }
  68. private:
  69. std::unordered_set<SctpTransport *> mSet;
  70. std::shared_mutex mMutex;
  71. };
  72. SctpTransport::InstancesSet *SctpTransport::Instances = new InstancesSet;
  73. void SctpTransport::Init() {
  74. usrsctp_init(0, SctpTransport::WriteCallback, SctpTransport::DebugCallback);
  75. usrsctp_sysctl_set_sctp_pr_enable(1); // Enable Partial Reliability Extension (RFC 3758)
  76. usrsctp_sysctl_set_sctp_ecn_enable(0); // Disable Explicit Congestion Notification
  77. #ifdef SCTP_DEBUG
  78. usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_ALL);
  79. #endif
  80. }
  81. void SctpTransport::SetSettings(const SctpSettings &s) {
  82. // The send and receive window size of usrsctp is 256KiB, which is too small for realistic RTTs,
  83. // therefore we increase it to 1MiB by default for better performance.
  84. // See https://bugzilla.mozilla.org/show_bug.cgi?id=1051685
  85. usrsctp_sysctl_set_sctp_recvspace(to_uint32(s.recvBufferSize.value_or(1024 * 1024)));
  86. usrsctp_sysctl_set_sctp_sendspace(to_uint32(s.sendBufferSize.value_or(1024 * 1024)));
  87. // Increase maximum chunks number on queue to 10K by default
  88. usrsctp_sysctl_set_sctp_max_chunks_on_queue(to_uint32(s.maxChunksOnQueue.value_or(10 * 1024)));
  89. // Increase initial congestion window size to 10 MTUs (RFC 6928) by default
  90. usrsctp_sysctl_set_sctp_initial_cwnd(to_uint32(s.initialCongestionWindow.value_or(10)));
  91. // Set max burst to 10 MTUs by default (max burst is initially 0, meaning disabled)
  92. usrsctp_sysctl_set_sctp_max_burst_default(to_uint32(s.maxBurst.value_or(10)));
  93. // Use standard SCTP congestion control (RFC 4960) by default
  94. // See https://github.com/paullouisageneau/libdatachannel/issues/354
  95. usrsctp_sysctl_set_sctp_default_cc_module(to_uint32(s.congestionControlModule.value_or(0)));
  96. // Reduce SACK delay to 20ms by default (the recommended default value from RFC 4960 is 200ms)
  97. usrsctp_sysctl_set_sctp_delayed_sack_time_default(
  98. to_uint32(s.delayedSackTime.value_or(20ms).count()));
  99. // RTO settings
  100. // RFC 2988 recommends a 1s min RTO, which is very high, but TCP on Linux has a 200ms min RTO
  101. usrsctp_sysctl_set_sctp_rto_min_default(
  102. to_uint32(s.minRetransmitTimeout.value_or(200ms).count()));
  103. // Set only 10s as max RTO instead of 60s for shorter connection timeout
  104. usrsctp_sysctl_set_sctp_rto_max_default(
  105. to_uint32(s.maxRetransmitTimeout.value_or(10000ms).count()));
  106. usrsctp_sysctl_set_sctp_init_rto_max_default(
  107. to_uint32(s.maxRetransmitTimeout.value_or(10000ms).count()));
  108. // Still set 1s as initial RTO
  109. usrsctp_sysctl_set_sctp_rto_initial_default(
  110. to_uint32(s.initialRetransmitTimeout.value_or(1000ms).count()));
  111. // RTX settings
  112. // 5 retransmissions instead of 8 to shorten the backoff for shorter connection timeout
  113. auto maxRtx = to_uint32(s.maxRetransmitAttempts.value_or(5));
  114. usrsctp_sysctl_set_sctp_init_rtx_max_default(maxRtx);
  115. usrsctp_sysctl_set_sctp_assoc_rtx_max_default(maxRtx);
  116. usrsctp_sysctl_set_sctp_path_rtx_max_default(maxRtx); // single path
  117. // Heartbeat interval
  118. usrsctp_sysctl_set_sctp_heartbeat_interval_default(
  119. to_uint32(s.heartbeatInterval.value_or(10000ms).count()));
  120. }
  121. void SctpTransport::Cleanup() {
  122. while (usrsctp_finish())
  123. std::this_thread::sleep_for(100ms);
  124. }
  125. SctpTransport::SctpTransport(shared_ptr<Transport> lower, const Configuration &config, Ports ports,
  126. message_callback recvCallback, amount_callback bufferedAmountCallback,
  127. state_callback stateChangeCallback)
  128. : Transport(lower, std::move(stateChangeCallback)),
  129. mMaxMessageSize(config.maxMessageSize.value_or(DEFAULT_LOCAL_MAX_MESSAGE_SIZE)),
  130. mPorts(std::move(ports)), mSendQueue(0, message_size_func),
  131. mBufferedAmountCallback(std::move(bufferedAmountCallback)) {
  132. onRecv(std::move(recvCallback));
  133. PLOG_DEBUG << "Initializing SCTP transport";
  134. mSock = usrsctp_socket(AF_CONN, SOCK_STREAM, IPPROTO_SCTP, nullptr, nullptr, 0, nullptr);
  135. if (!mSock)
  136. throw std::runtime_error("Could not create SCTP socket, errno=" + std::to_string(errno));
  137. usrsctp_set_upcall(mSock, &SctpTransport::UpcallCallback, this);
  138. if (usrsctp_set_non_blocking(mSock, 1))
  139. throw std::runtime_error("Unable to set non-blocking mode, errno=" + std::to_string(errno));
  140. // SCTP must stop sending after the lower layer is shut down, so disable linger
  141. struct linger sol = {};
  142. sol.l_onoff = 1;
  143. sol.l_linger = 0;
  144. if (usrsctp_setsockopt(mSock, SOL_SOCKET, SO_LINGER, &sol, sizeof(sol)))
  145. throw std::runtime_error("Could not set socket option SO_LINGER, errno=" +
  146. std::to_string(errno));
  147. struct sctp_assoc_value av = {};
  148. av.assoc_id = SCTP_ALL_ASSOC;
  149. av.assoc_value = 1;
  150. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, &av, sizeof(av)))
  151. throw std::runtime_error("Could not set socket option SCTP_ENABLE_STREAM_RESET, errno=" +
  152. std::to_string(errno));
  153. int on = 1;
  154. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_RECVRCVINFO, &on, sizeof(on)))
  155. throw std::runtime_error("Could set socket option SCTP_RECVRCVINFO, errno=" +
  156. std::to_string(errno));
  157. struct sctp_event se = {};
  158. se.se_assoc_id = SCTP_ALL_ASSOC;
  159. se.se_on = 1;
  160. se.se_type = SCTP_ASSOC_CHANGE;
  161. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_EVENT, &se, sizeof(se)))
  162. throw std::runtime_error("Could not subscribe to event SCTP_ASSOC_CHANGE, errno=" +
  163. std::to_string(errno));
  164. se.se_type = SCTP_SENDER_DRY_EVENT;
  165. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_EVENT, &se, sizeof(se)))
  166. throw std::runtime_error("Could not subscribe to event SCTP_SENDER_DRY_EVENT, errno=" +
  167. std::to_string(errno));
  168. se.se_type = SCTP_STREAM_RESET_EVENT;
  169. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_EVENT, &se, sizeof(se)))
  170. throw std::runtime_error("Could not subscribe to event SCTP_STREAM_RESET_EVENT, errno=" +
  171. std::to_string(errno));
  172. // RFC 8831 6.6. Transferring User Data on a Data Channel
  173. // The sender SHOULD disable the Nagle algorithm (see [RFC1122) to minimize the latency
  174. // See https://www.rfc-editor.org/rfc/rfc8831.html#section-6.6
  175. int nodelay = 1;
  176. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_NODELAY, &nodelay, sizeof(nodelay)))
  177. throw std::runtime_error("Could not set socket option SCTP_NODELAY, errno=" +
  178. std::to_string(errno));
  179. struct sctp_paddrparams spp = {};
  180. // Enable SCTP heartbeats
  181. spp.spp_flags = SPP_HB_ENABLE;
  182. // RFC 8261 5. DTLS considerations:
  183. // If path MTU discovery is performed by the SCTP layer and IPv4 is used as the network-layer
  184. // protocol, the DTLS implementation SHOULD allow the DTLS user to enforce that the
  185. // corresponding IPv4 packet is sent with the Don't Fragment (DF) bit set. If controlling the DF
  186. // bit is not possible (for example, due to implementation restrictions), a safe value for the
  187. // path MTU has to be used by the SCTP stack. It is RECOMMENDED that the safe value not exceed
  188. // 1200 bytes.
  189. // See https://www.rfc-editor.org/rfc/rfc8261.html#section-5
  190. #if USE_PMTUD
  191. if (!config.mtu.has_value()) {
  192. #else
  193. if (false) {
  194. #endif
  195. // Enable SCTP path MTU discovery
  196. spp.spp_flags |= SPP_PMTUD_ENABLE;
  197. PLOG_VERBOSE << "Path MTU discovery enabled";
  198. } else {
  199. // Fall back to a safe MTU value.
  200. spp.spp_flags |= SPP_PMTUD_DISABLE;
  201. // The MTU value provided specifies the space available for chunks in the
  202. // packet, so we also subtract the SCTP header size.
  203. size_t pmtu = config.mtu.value_or(DEFAULT_MTU) - 12 - 48 - 8 - 40; // SCTP/DTLS/UDP/IPv6
  204. spp.spp_pathmtu = to_uint32(pmtu);
  205. PLOG_VERBOSE << "Path MTU discovery disabled, SCTP MTU set to " << pmtu;
  206. }
  207. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS, &spp, sizeof(spp)))
  208. throw std::runtime_error("Could not set socket option SCTP_PEER_ADDR_PARAMS, errno=" +
  209. std::to_string(errno));
  210. // RFC 8831 6.2. SCTP Association Management
  211. // The number of streams negotiated during SCTP association setup SHOULD be 65535, which is the
  212. // maximum number of streams that can be negotiated during the association setup.
  213. // See https://www.rfc-editor.org/rfc/rfc8831.html#section-6.2
  214. // However, usrsctp allocates tables to hold the stream states. For 65535 streams, it results in
  215. // the waste of a few MBs for each association. Therefore, we use a lower limit to save memory.
  216. // See https://github.com/sctplab/usrsctp/issues/121
  217. struct sctp_initmsg sinit = {};
  218. sinit.sinit_num_ostreams = MAX_SCTP_STREAMS_COUNT;
  219. sinit.sinit_max_instreams = MAX_SCTP_STREAMS_COUNT;
  220. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_INITMSG, &sinit, sizeof(sinit)))
  221. throw std::runtime_error("Could not set socket option SCTP_INITMSG, errno=" +
  222. std::to_string(errno));
  223. // Prevent fragmented interleave of messages (i.e. level 0), see RFC 6458 section 8.1.20.
  224. // Unless the user has set the fragmentation interleave level to 0, notifications
  225. // may also be interleaved with partially delivered messages.
  226. int level = 0;
  227. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_FRAGMENT_INTERLEAVE, &level, sizeof(level)))
  228. throw std::runtime_error("Could not disable SCTP fragmented interleave, errno=" +
  229. std::to_string(errno));
  230. // When using SCTP over DTLS, the data integrity is ensured by DTLS. Therefore, there's no need
  231. // to check CRC32c additionally when receiving.
  232. // See https://datatracker.ietf.org/doc/html/draft-ietf-tsvwg-sctp-zero-checksum
  233. int edmid = SCTP_EDMID_LOWER_LAYER_DTLS;
  234. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_ACCEPT_ZERO_CHECKSUM, &edmid, sizeof(edmid)))
  235. throw std::runtime_error("Could set socket option SCTP_ACCEPT_ZERO_CHECKSUM, errno=" +
  236. std::to_string(errno));
  237. int rcvBuf = 0;
  238. socklen_t rcvBufLen = sizeof(rcvBuf);
  239. if (usrsctp_getsockopt(mSock, SOL_SOCKET, SO_RCVBUF, &rcvBuf, &rcvBufLen))
  240. throw std::runtime_error("Could not get SCTP recv buffer size, errno=" +
  241. std::to_string(errno));
  242. int sndBuf = 0;
  243. socklen_t sndBufLen = sizeof(sndBuf);
  244. if (usrsctp_getsockopt(mSock, SOL_SOCKET, SO_SNDBUF, &sndBuf, &sndBufLen))
  245. throw std::runtime_error("Could not get SCTP send buffer size, errno=" +
  246. std::to_string(errno));
  247. // Ensure the buffer is also large enough to accomodate the largest messages
  248. const int minBuf = int(std::min(mMaxMessageSize, size_t(std::numeric_limits<int>::max())));
  249. rcvBuf = std::max(rcvBuf, minBuf);
  250. sndBuf = std::max(sndBuf, minBuf);
  251. if (usrsctp_setsockopt(mSock, SOL_SOCKET, SO_RCVBUF, &rcvBuf, sizeof(rcvBuf)))
  252. throw std::runtime_error("Could not set SCTP recv buffer size, errno=" +
  253. std::to_string(errno));
  254. if (usrsctp_setsockopt(mSock, SOL_SOCKET, SO_SNDBUF, &sndBuf, sizeof(sndBuf)))
  255. throw std::runtime_error("Could not set SCTP send buffer size, errno=" +
  256. std::to_string(errno));
  257. usrsctp_register_address(this);
  258. Instances->insert(this);
  259. }
  260. SctpTransport::~SctpTransport() {
  261. PLOG_DEBUG << "Destroying SCTP transport";
  262. mProcessor.join(); // if we are here, the processor must be empty
  263. // Before unregistering incoming() from the lower layer, we need to make sure the thread from
  264. // lower layers is not blocked in incoming() by the WrittenOnce condition.
  265. mWrittenOnce = true;
  266. mWrittenCondition.notify_all();
  267. unregisterIncoming();
  268. usrsctp_close(mSock);
  269. usrsctp_deregister_address(this);
  270. Instances->erase(this);
  271. }
  272. void SctpTransport::onBufferedAmount(amount_callback callback) {
  273. mBufferedAmountCallback = std::move(callback);
  274. }
  275. void SctpTransport::start() {
  276. registerIncoming();
  277. connect();
  278. }
  279. void SctpTransport::stop() { close(); }
  280. struct sockaddr_conn SctpTransport::getSockAddrConn(uint16_t port) {
  281. struct sockaddr_conn sconn = {};
  282. sconn.sconn_family = AF_CONN;
  283. sconn.sconn_port = htons(port);
  284. sconn.sconn_addr = this;
  285. #ifdef HAVE_SCONN_LEN
  286. sconn.sconn_len = sizeof(sconn);
  287. #endif
  288. return sconn;
  289. }
  290. void SctpTransport::connect() {
  291. PLOG_DEBUG << "SCTP connecting (local port=" << mPorts.local
  292. << ", remote port=" << mPorts.remote << ")";
  293. changeState(State::Connecting);
  294. auto local = getSockAddrConn(mPorts.local);
  295. if (usrsctp_bind(mSock, reinterpret_cast<struct sockaddr *>(&local), sizeof(local)))
  296. throw std::runtime_error("Could not bind usrsctp socket, errno=" + std::to_string(errno));
  297. // According to RFC 8841, both endpoints must initiate the SCTP association, in a
  298. // simultaneous-open manner, irrelevent to the SDP setup role.
  299. // See https://www.rfc-editor.org/rfc/rfc8841.html#section-9.3
  300. auto remote = getSockAddrConn(mPorts.remote);
  301. int ret = usrsctp_connect(mSock, reinterpret_cast<struct sockaddr *>(&remote), sizeof(remote));
  302. if (ret && errno != EINPROGRESS)
  303. throw std::runtime_error("Connection attempt failed, errno=" + std::to_string(errno));
  304. }
  305. bool SctpTransport::send(message_ptr message) {
  306. std::lock_guard lock(mSendMutex);
  307. if (state() != State::Connected)
  308. return false;
  309. if (!message)
  310. return trySendQueue();
  311. PLOG_VERBOSE << "Send size=" << message->size();
  312. if (message->size() > mMaxMessageSize)
  313. throw std::invalid_argument("Message is too large");
  314. // Flush the queue, and if nothing is pending, try to send directly
  315. if (trySendQueue() && trySendMessage(message))
  316. return true;
  317. mSendQueue.push(message);
  318. updateBufferedAmount(to_uint16(message->stream), ptrdiff_t(message_size_func(message)));
  319. return false;
  320. }
  321. bool SctpTransport::flush() {
  322. try {
  323. std::lock_guard lock(mSendMutex);
  324. if (state() != State::Connected)
  325. return false;
  326. trySendQueue();
  327. return true;
  328. } catch (const std::exception &e) {
  329. PLOG_WARNING << "SCTP flush: " << e.what();
  330. return false;
  331. }
  332. }
  333. void SctpTransport::closeStream(unsigned int stream) {
  334. std::lock_guard lock(mSendMutex);
  335. // RFC 8831 6.7. Closing a Data Channel
  336. // Closing of a data channel MUST be signaled by resetting the corresponding outgoing streams
  337. // See https://www.rfc-editor.org/rfc/rfc8831.html#section-6.7
  338. mSendQueue.push(make_message(0, Message::Reset, to_uint16(stream)));
  339. // This method must not call the buffered callback synchronously
  340. mProcessor.enqueue(&SctpTransport::flush, shared_from_this());
  341. }
  342. void SctpTransport::close() {
  343. mSendQueue.stop();
  344. if (state() == State::Connected) {
  345. mProcessor.enqueue(&SctpTransport::flush, shared_from_this());
  346. } else if (state() == State::Connecting) {
  347. PLOG_DEBUG << "SCTP early shutdown";
  348. if (usrsctp_shutdown(mSock, SHUT_RDWR)) {
  349. if (errno == ENOTCONN) {
  350. PLOG_VERBOSE << "SCTP already shut down";
  351. } else {
  352. PLOG_WARNING << "SCTP shutdown failed, errno=" << errno;
  353. }
  354. }
  355. changeState(State::Failed);
  356. mWrittenCondition.notify_all();
  357. }
  358. }
  359. unsigned int SctpTransport::maxStream() const {
  360. unsigned int streamsCount = mNegotiatedStreamsCount.value_or(MAX_SCTP_STREAMS_COUNT);
  361. return streamsCount > 0 ? streamsCount - 1 : 0;
  362. }
  363. void SctpTransport::incoming(message_ptr message) {
  364. // There could be a race condition here where we receive the remote INIT before the local one is
  365. // sent, which would result in the connection being aborted. Therefore, we need to wait for data
  366. // to be sent on our side (i.e. the local INIT) before proceeding.
  367. if (!mWrittenOnce) { // test the atomic boolean is not set first to prevent a lock contention
  368. std::unique_lock lock(mWriteMutex);
  369. mWrittenCondition.wait(lock, [&]() { return mWrittenOnce || state() == State::Failed; });
  370. }
  371. if (state() == State::Failed)
  372. return;
  373. if (!message) {
  374. PLOG_INFO << "SCTP disconnected";
  375. changeState(State::Disconnected);
  376. recv(nullptr);
  377. return;
  378. }
  379. PLOG_VERBOSE << "Incoming size=" << message->size();
  380. usrsctp_conninput(this, message->data(), message->size(), 0);
  381. }
  382. bool SctpTransport::outgoing(message_ptr message) {
  383. // Set recommended medium-priority DSCP value
  384. // See https://www.rfc-editor.org/rfc/rfc8837.html#section-5
  385. message->dscp = 10; // AF11: Assured Forwarding class 1, low drop probability
  386. return Transport::outgoing(std::move(message));
  387. }
  388. void SctpTransport::doRecv() {
  389. std::lock_guard lock(mRecvMutex);
  390. --mPendingRecvCount;
  391. try {
  392. while (state() != State::Disconnected && state() != State::Failed) {
  393. const size_t bufferSize = 65536;
  394. byte buffer[bufferSize];
  395. socklen_t fromlen = 0;
  396. struct sctp_rcvinfo info = {};
  397. socklen_t infolen = sizeof(info);
  398. unsigned int infotype = 0;
  399. int flags = 0;
  400. ssize_t len = usrsctp_recvv(mSock, buffer, bufferSize, nullptr, &fromlen, &info,
  401. &infolen, &infotype, &flags);
  402. if (len < 0) {
  403. if (errno == EWOULDBLOCK || errno == EAGAIN || errno == ECONNRESET)
  404. break;
  405. else
  406. throw std::runtime_error("SCTP recv failed, errno=" + std::to_string(errno));
  407. } else if (len == 0) {
  408. break;
  409. }
  410. PLOG_VERBOSE << "SCTP recv, len=" << len;
  411. // SCTP_FRAGMENT_INTERLEAVE does not seem to work as expected for messages > 64KB,
  412. // therefore partial notifications and messages need to be handled separately.
  413. if (flags & MSG_NOTIFICATION) {
  414. // SCTP event notification
  415. mPartialNotification.insert(mPartialNotification.end(), buffer, buffer + len);
  416. if (flags & MSG_EOR) {
  417. // Notification is complete, process it
  418. binary notification;
  419. mPartialNotification.swap(notification);
  420. auto n = reinterpret_cast<union sctp_notification *>(notification.data());
  421. processNotification(n, notification.size());
  422. }
  423. } else {
  424. // SCTP message
  425. mPartialMessage.insert(mPartialMessage.end(), buffer, buffer + len);
  426. if (mPartialMessage.size() > mMaxMessageSize) {
  427. PLOG_WARNING << "SCTP message is too large, truncating it";
  428. mPartialMessage.resize(mMaxMessageSize);
  429. }
  430. if (flags & MSG_EOR) {
  431. // Message is complete, process it
  432. binary message;
  433. mPartialMessage.swap(message);
  434. if (infotype != SCTP_RECVV_RCVINFO)
  435. throw std::runtime_error("Missing SCTP recv info");
  436. processData(std::move(message), info.rcv_sid, PayloadId(ntohl(info.rcv_ppid)));
  437. }
  438. }
  439. }
  440. } catch (const std::exception &e) {
  441. PLOG_WARNING << e.what();
  442. }
  443. }
  444. void SctpTransport::doFlush() {
  445. std::lock_guard lock(mSendMutex);
  446. --mPendingFlushCount;
  447. try {
  448. trySendQueue();
  449. } catch (const std::exception &e) {
  450. PLOG_WARNING << e.what();
  451. }
  452. }
  453. void SctpTransport::enqueueRecv() {
  454. if (mPendingRecvCount > 0)
  455. return;
  456. if (auto shared_this = weak_from_this().lock()) {
  457. // This is called from the upcall callback, we must not release the shared ptr here
  458. ++mPendingRecvCount;
  459. mProcessor.enqueue(&SctpTransport::doRecv, std::move(shared_this));
  460. }
  461. }
  462. void SctpTransport::enqueueFlush() {
  463. if (mPendingFlushCount > 0)
  464. return;
  465. if (auto shared_this = weak_from_this().lock()) {
  466. // This is called from the upcall callback, we must not release the shared ptr here
  467. ++mPendingFlushCount;
  468. mProcessor.enqueue(&SctpTransport::doFlush, std::move(shared_this));
  469. }
  470. }
  471. bool SctpTransport::trySendQueue() {
  472. // Requires mSendMutex to be locked
  473. while (auto next = mSendQueue.peek()) {
  474. message_ptr message = std::move(*next);
  475. if (!trySendMessage(message))
  476. return false;
  477. mSendQueue.pop();
  478. updateBufferedAmount(to_uint16(message->stream), -ptrdiff_t(message_size_func(message)));
  479. }
  480. if (!mSendQueue.running() && !std::exchange(mSendShutdown, true)) {
  481. PLOG_DEBUG << "SCTP shutdown";
  482. if (usrsctp_shutdown(mSock, SHUT_WR)) {
  483. if (errno == ENOTCONN) {
  484. PLOG_VERBOSE << "SCTP already shut down";
  485. } else {
  486. PLOG_WARNING << "SCTP shutdown failed, errno=" << errno;
  487. changeState(State::Disconnected);
  488. recv(nullptr);
  489. }
  490. }
  491. }
  492. return true;
  493. }
  494. bool SctpTransport::trySendMessage(message_ptr message) {
  495. // Requires mSendMutex to be locked
  496. if (state() != State::Connected)
  497. return false;
  498. uint32_t ppid;
  499. switch (message->type) {
  500. case Message::String:
  501. ppid = !message->empty() ? PPID_STRING : PPID_STRING_EMPTY;
  502. break;
  503. case Message::Binary:
  504. ppid = !message->empty() ? PPID_BINARY : PPID_BINARY_EMPTY;
  505. break;
  506. case Message::Control:
  507. ppid = PPID_CONTROL;
  508. break;
  509. case Message::Reset:
  510. sendReset(uint16_t(message->stream));
  511. return true;
  512. default:
  513. // Ignore
  514. return true;
  515. }
  516. PLOG_VERBOSE << "SCTP try send size=" << message->size();
  517. // TODO: Implement SCTP ndata specification draft when supported everywhere
  518. // See https://datatracker.ietf.org/doc/html/draft-ietf-tsvwg-sctp-ndata-08
  519. const Reliability reliability = message->reliability ? *message->reliability : Reliability();
  520. struct sctp_sendv_spa spa = {};
  521. // set sndinfo
  522. spa.sendv_flags |= SCTP_SEND_SNDINFO_VALID;
  523. spa.sendv_sndinfo.snd_sid = uint16_t(message->stream);
  524. spa.sendv_sndinfo.snd_ppid = htonl(ppid);
  525. spa.sendv_sndinfo.snd_flags |= SCTP_EOR; // implicit here
  526. // set prinfo
  527. spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
  528. if (reliability.unordered)
  529. spa.sendv_sndinfo.snd_flags |= SCTP_UNORDERED;
  530. if (reliability.maxPacketLifeTime) {
  531. spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
  532. spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_TTL;
  533. spa.sendv_prinfo.pr_value = to_uint32(reliability.maxPacketLifeTime->count());
  534. } else if (reliability.maxRetransmits) {
  535. spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
  536. spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_RTX;
  537. spa.sendv_prinfo.pr_value = to_uint32(*reliability.maxRetransmits);
  538. }
  539. // else {
  540. // spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_NONE;
  541. // }
  542. // Deprecated
  543. else switch (reliability.typeDeprecated) {
  544. case Reliability::Type::Rexmit:
  545. spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
  546. spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_RTX;
  547. spa.sendv_prinfo.pr_value = to_uint32(std::get<int>(reliability.rexmit));
  548. break;
  549. case Reliability::Type::Timed:
  550. spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
  551. spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_TTL;
  552. spa.sendv_prinfo.pr_value = to_uint32(std::get<milliseconds>(reliability.rexmit).count());
  553. break;
  554. default:
  555. spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_NONE;
  556. break;
  557. }
  558. ssize_t ret;
  559. if (!message->empty()) {
  560. ret = usrsctp_sendv(mSock, message->data(), message->size(), nullptr, 0, &spa, sizeof(spa),
  561. SCTP_SENDV_SPA, 0);
  562. } else {
  563. const char zero = 0;
  564. ret = usrsctp_sendv(mSock, &zero, 1, nullptr, 0, &spa, sizeof(spa), SCTP_SENDV_SPA, 0);
  565. }
  566. if (ret < 0) {
  567. if (errno == EWOULDBLOCK || errno == EAGAIN) {
  568. PLOG_VERBOSE << "SCTP sending not possible";
  569. return false;
  570. }
  571. PLOG_ERROR << "SCTP sending failed, errno=" << errno;
  572. throw std::runtime_error("Sending failed, errno=" + std::to_string(errno));
  573. }
  574. PLOG_VERBOSE << "SCTP sent size=" << message->size();
  575. if (message->type == Message::Binary || message->type == Message::String)
  576. mBytesSent += message->size();
  577. return true;
  578. }
  579. void SctpTransport::updateBufferedAmount(uint16_t streamId, ptrdiff_t delta) {
  580. // Requires mSendMutex to be locked
  581. if (delta == 0)
  582. return;
  583. auto it = mBufferedAmount.insert(std::make_pair(streamId, 0)).first;
  584. size_t amount = size_t(std::max(ptrdiff_t(it->second) + delta, ptrdiff_t(0)));
  585. if (amount == 0)
  586. mBufferedAmount.erase(it);
  587. else
  588. it->second = amount;
  589. // Synchronously call the buffered amount callback
  590. triggerBufferedAmount(streamId, amount);
  591. }
  592. void SctpTransport::triggerBufferedAmount(uint16_t streamId, size_t amount) {
  593. try {
  594. mBufferedAmountCallback(streamId, amount);
  595. } catch (const std::exception &e) {
  596. PLOG_WARNING << "SCTP buffered amount callback: " << e.what();
  597. }
  598. }
  599. void SctpTransport::sendReset(uint16_t streamId) {
  600. // Requires mSendMutex to be locked
  601. if (state() != State::Connected)
  602. return;
  603. PLOG_DEBUG << "SCTP resetting stream " << streamId;
  604. using srs_t = struct sctp_reset_streams;
  605. const size_t len = sizeof(srs_t) + sizeof(uint16_t);
  606. byte buffer[len] = {};
  607. srs_t &srs = *reinterpret_cast<srs_t *>(buffer);
  608. srs.srs_flags = SCTP_STREAM_RESET_OUTGOING;
  609. srs.srs_number_streams = 1;
  610. srs.srs_stream_list[0] = streamId;
  611. mWritten = false;
  612. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_RESET_STREAMS, &srs, len) == 0) {
  613. std::unique_lock lock(mWriteMutex); // locking before setsockopt might deadlock usrsctp...
  614. mWrittenCondition.wait_for(lock, 1000ms,
  615. [&]() { return mWritten || state() != State::Connected; });
  616. } else if (errno == EINVAL) {
  617. PLOG_DEBUG << "SCTP stream " << streamId << " already reset";
  618. } else {
  619. PLOG_WARNING << "SCTP reset stream " << streamId << " failed, errno=" << errno;
  620. }
  621. }
  622. void SctpTransport::handleUpcall() noexcept {
  623. try {
  624. PLOG_VERBOSE << "Handle upcall";
  625. int events = usrsctp_get_events(mSock);
  626. if (events & SCTP_EVENT_READ)
  627. enqueueRecv();
  628. if (events & SCTP_EVENT_WRITE)
  629. enqueueFlush();
  630. } catch (const std::exception &e) {
  631. PLOG_ERROR << "SCTP upcall: " << e.what();
  632. }
  633. }
  634. int SctpTransport::handleWrite(byte *data, size_t len, uint8_t /*tos*/,
  635. uint8_t /*set_df*/) noexcept {
  636. try {
  637. std::unique_lock lock(mWriteMutex);
  638. PLOG_VERBOSE << "Handle write, len=" << len;
  639. if (!outgoing(make_message(data, data + len)))
  640. return -1;
  641. mWritten = true;
  642. mWrittenOnce = true;
  643. mWrittenCondition.notify_all();
  644. } catch (const std::exception &e) {
  645. PLOG_ERROR << "SCTP write: " << e.what();
  646. return -1;
  647. }
  648. return 0; // success
  649. }
  650. void SctpTransport::processData(binary &&data, uint16_t sid, PayloadId ppid) {
  651. PLOG_VERBOSE << "Process data, size=" << data.size();
  652. // RFC 8831: The usage of the PPIDs "WebRTC String Partial" and "WebRTC Binary Partial" is
  653. // deprecated. They were used for a PPID-based fragmentation and reassembly of user messages
  654. // belonging to reliable and ordered data channels.
  655. // See https://www.rfc-editor.org/rfc/rfc8831.html#section-6.6
  656. // We handle those PPIDs at reception for compatibility reasons but shall never send them.
  657. switch (ppid) {
  658. case PPID_CONTROL:
  659. recv(make_message(std::move(data), Message::Control, sid));
  660. break;
  661. case PPID_STRING_PARTIAL: // deprecated
  662. mPartialStringData.insert(mPartialStringData.end(), data.begin(), data.end());
  663. mPartialStringData.resize(mMaxMessageSize);
  664. break;
  665. case PPID_STRING:
  666. if (mPartialStringData.empty()) {
  667. mBytesReceived += data.size();
  668. recv(make_message(std::move(data), Message::String, sid));
  669. } else {
  670. mPartialStringData.insert(mPartialStringData.end(), data.begin(), data.end());
  671. mPartialStringData.resize(mMaxMessageSize);
  672. mBytesReceived += mPartialStringData.size();
  673. auto message = make_message(std::move(mPartialStringData), Message::String, sid);
  674. mPartialStringData.clear();
  675. recv(std::move(message));
  676. }
  677. break;
  678. case PPID_STRING_EMPTY:
  679. recv(make_message(std::move(mPartialStringData), Message::String, sid));
  680. mPartialStringData.clear();
  681. break;
  682. case PPID_BINARY_PARTIAL: // deprecated
  683. mPartialBinaryData.insert(mPartialBinaryData.end(), data.begin(), data.end());
  684. mPartialBinaryData.resize(mMaxMessageSize);
  685. break;
  686. case PPID_BINARY:
  687. if (mPartialBinaryData.empty()) {
  688. mBytesReceived += data.size();
  689. recv(make_message(std::move(data), Message::Binary, sid));
  690. } else {
  691. mPartialBinaryData.insert(mPartialBinaryData.end(), data.begin(), data.end());
  692. mPartialBinaryData.resize(mMaxMessageSize);
  693. mBytesReceived += mPartialBinaryData.size();
  694. auto message = make_message(std::move(mPartialBinaryData), Message::Binary, sid);
  695. mPartialBinaryData.clear();
  696. recv(std::move(message));
  697. }
  698. break;
  699. case PPID_BINARY_EMPTY:
  700. recv(make_message(std::move(mPartialBinaryData), Message::Binary, sid));
  701. mPartialBinaryData.clear();
  702. break;
  703. default:
  704. // Unknown
  705. COUNTER_UNKNOWN_PPID++;
  706. PLOG_VERBOSE << "Unknown PPID: " << uint32_t(ppid);
  707. return;
  708. }
  709. }
  710. void SctpTransport::processNotification(const union sctp_notification *notify, size_t len) {
  711. if (len != size_t(notify->sn_header.sn_length)) {
  712. PLOG_WARNING << "Unexpected notification length, expected=" << notify->sn_header.sn_length
  713. << ", actual=" << len;
  714. return;
  715. }
  716. auto type = notify->sn_header.sn_type;
  717. PLOG_VERBOSE << "Processing notification, type=" << type;
  718. switch (type) {
  719. case SCTP_ASSOC_CHANGE: {
  720. PLOG_VERBOSE << "SCTP association change event";
  721. const struct sctp_assoc_change &sac = notify->sn_assoc_change;
  722. if (sac.sac_state == SCTP_COMM_UP) {
  723. PLOG_DEBUG << "SCTP negotiated streams: incoming=" << sac.sac_inbound_streams
  724. << ", outgoing=" << sac.sac_outbound_streams;
  725. mNegotiatedStreamsCount.emplace(
  726. std::min(sac.sac_inbound_streams, sac.sac_outbound_streams));
  727. PLOG_INFO << "SCTP connected";
  728. changeState(State::Connected);
  729. } else {
  730. if (state() == State::Connected) {
  731. PLOG_INFO << "SCTP disconnected";
  732. changeState(State::Disconnected);
  733. recv(nullptr);
  734. } else {
  735. PLOG_ERROR << "SCTP connection failed";
  736. changeState(State::Failed);
  737. }
  738. mWrittenCondition.notify_all();
  739. }
  740. break;
  741. }
  742. case SCTP_SENDER_DRY_EVENT: {
  743. PLOG_VERBOSE << "SCTP sender dry event";
  744. // It should not be necessary since the send callback should have been called already,
  745. // but to be sure, let's try to send now.
  746. flush();
  747. break;
  748. }
  749. case SCTP_STREAM_RESET_EVENT: {
  750. const struct sctp_stream_reset_event &reset_event = notify->sn_strreset_event;
  751. const int count = (reset_event.strreset_length - sizeof(reset_event)) / sizeof(uint16_t);
  752. const uint16_t flags = reset_event.strreset_flags;
  753. IF_PLOG(plog::verbose) {
  754. std::ostringstream desc;
  755. desc << "flags=";
  756. if (flags & SCTP_STREAM_RESET_OUTGOING_SSN && flags & SCTP_STREAM_RESET_INCOMING_SSN)
  757. desc << "outgoing|incoming";
  758. else if (flags & SCTP_STREAM_RESET_OUTGOING_SSN)
  759. desc << "outgoing";
  760. else if (flags & SCTP_STREAM_RESET_INCOMING_SSN)
  761. desc << "incoming";
  762. else
  763. desc << "0";
  764. desc << ", streams=[";
  765. for (int i = 0; i < count; ++i) {
  766. uint16_t streamId = reset_event.strreset_stream_list[i];
  767. desc << (i != 0 ? "," : "") << streamId;
  768. }
  769. desc << "]";
  770. PLOG_VERBOSE << "SCTP reset event, " << desc.str();
  771. }
  772. // RFC 8831 6.7. Closing a Data Channel
  773. // If one side decides to close the data channel, it resets the corresponding outgoing
  774. // stream. When the peer sees that an incoming stream was reset, it also resets its
  775. // corresponding outgoing stream.
  776. // See https://www.rfc-editor.org/rfc/rfc8831.html#section-6.7
  777. if (flags & SCTP_STREAM_RESET_INCOMING_SSN) {
  778. for (int i = 0; i < count; ++i) {
  779. uint16_t streamId = reset_event.strreset_stream_list[i];
  780. recv(make_message(0, Message::Reset, streamId));
  781. }
  782. }
  783. break;
  784. }
  785. default:
  786. // Ignore
  787. break;
  788. }
  789. }
  790. void SctpTransport::clearStats() {
  791. mBytesReceived = 0;
  792. mBytesSent = 0;
  793. }
  794. size_t SctpTransport::bytesSent() { return mBytesSent; }
  795. size_t SctpTransport::bytesReceived() { return mBytesReceived; }
  796. optional<milliseconds> SctpTransport::rtt() {
  797. if (state() != State::Connected)
  798. return nullopt;
  799. struct sctp_status status = {};
  800. socklen_t len = sizeof(status);
  801. if (usrsctp_getsockopt(mSock, IPPROTO_SCTP, SCTP_STATUS, &status, &len))
  802. return nullopt;
  803. return milliseconds(status.sstat_primary.spinfo_srtt);
  804. }
  805. void SctpTransport::UpcallCallback(struct socket *, void *arg, int /* flags */) {
  806. auto *transport = static_cast<SctpTransport *>(arg);
  807. if (auto locked = Instances->lock(transport))
  808. transport->handleUpcall();
  809. }
  810. int SctpTransport::WriteCallback(void *ptr, void *data, size_t len, uint8_t tos, uint8_t set_df) {
  811. auto *transport = static_cast<SctpTransport *>(ptr);
  812. // Workaround for sctplab/usrsctp#405: Send callback is invoked on already closed socket
  813. // https://github.com/sctplab/usrsctp/issues/405
  814. if (auto locked = Instances->lock(transport))
  815. return transport->handleWrite(static_cast<byte *>(data), len, tos, set_df);
  816. else
  817. return -1;
  818. }
  819. void SctpTransport::DebugCallback(const char *format, ...) {
  820. const size_t bufferSize = 1024;
  821. char buffer[bufferSize];
  822. va_list va;
  823. va_start(va, format);
  824. int len = std::vsnprintf(buffer, bufferSize, format, va);
  825. va_end(va);
  826. if (len <= 0)
  827. return;
  828. len = std::min(len, int(bufferSize - 1));
  829. buffer[len - 1] = '\0'; // remove newline
  830. PLOG_VERBOSE << "usrsctp: " << buffer; // usrsctp debug as verbose
  831. }
  832. } // namespace rtc::impl