sctptransport.cpp 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797
  1. /**
  2. * Copyright (c) 2019 Paul-Louis Ageneau
  3. *
  4. * This library is free software; you can redistribute it and/or
  5. * modify it under the terms of the GNU Lesser General Public
  6. * License as published by the Free Software Foundation; either
  7. * version 2.1 of the License, or (at your option) any later version.
  8. *
  9. * This library is distributed in the hope that it will be useful,
  10. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  12. * Lesser General Public License for more details.
  13. *
  14. * You should have received a copy of the GNU Lesser General Public
  15. * License along with this library; if not, write to the Free Software
  16. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  17. */
  18. #include "sctptransport.hpp"
  19. #include "logcounter.hpp"
  20. #include <chrono>
  21. #include <exception>
  22. #include <iostream>
  23. #include <thread>
  24. #include <vector>
  25. // The IETF draft says:
  26. // SCTP MUST support performing Path MTU discovery without relying on ICMP or ICMPv6 as specified in
  27. // [RFC4821] using probing messages specified in [RFC4820]. The initial Path MTU at the IP layer
  28. // SHOULD NOT exceed 1200 bytes for IPv4 and 1280 for IPv6.
  29. // See https://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-13#section-5
  30. //
  31. // However, usrsctp does not implement Path MTU discovery, so we need to disable it for now.
  32. // See https://github.com/sctplab/usrsctp/issues/205
  33. #define USE_PMTUD 0
  34. // TODO: When Path MTU discovery is supported, it needs to be enabled with libjuice as ICE backend
  35. // on all platforms except Mac OS where the Don't Fragment (DF) flag can't be set:
  36. /*
  37. #if !USE_NICE
  38. #ifndef __APPLE__
  39. // libjuice enables Linux path MTU discovery or sets the DF flag
  40. #define USE_PMTUD 1
  41. #else
  42. // Setting the DF flag is not available on Mac OS
  43. #define USE_PMTUD 0
  44. #endif
  45. #else // USE_NICE == 1
  46. #define USE_PMTUD 0
  47. #endif
  48. */
  49. using namespace std::chrono_literals;
  50. using namespace std::chrono;
  51. using std::shared_ptr;
  52. namespace rtc {
  53. static LogCounter COUNTER_UNKNOWN_PPID(plog::warning,
  54. "Number of SCTP packets received with an unknown PPID");
  55. static LogCounter
  56. COUNTER_BAD_NOTIF_LEN(plog::warning,
  57. "Number of SCTP packets received with an bad notification length");
  58. static LogCounter COUNTER_BAD_SCTP_STATUS(plog::warning,
  59. "Number of SCTP packets received with a bad status");
  60. std::unordered_set<SctpTransport *> SctpTransport::Instances;
  61. std::shared_mutex SctpTransport::InstancesMutex;
  62. void SctpTransport::Init() {
  63. usrsctp_init(0, &SctpTransport::WriteCallback, nullptr);
  64. usrsctp_sysctl_set_sctp_ecn_enable(0);
  65. usrsctp_sysctl_set_sctp_init_rtx_max_default(5);
  66. usrsctp_sysctl_set_sctp_path_rtx_max_default(5);
  67. usrsctp_sysctl_set_sctp_assoc_rtx_max_default(5); // single path
  68. usrsctp_sysctl_set_sctp_rto_min_default(1 * 1000); // ms
  69. usrsctp_sysctl_set_sctp_rto_max_default(10 * 1000); // ms
  70. usrsctp_sysctl_set_sctp_rto_initial_default(1 * 1000); // ms
  71. usrsctp_sysctl_set_sctp_init_rto_max_default(10 * 1000); // ms
  72. usrsctp_sysctl_set_sctp_heartbeat_interval_default(10 * 1000); // ms
  73. usrsctp_sysctl_set_sctp_max_chunks_on_queue(10 * 1024);
  74. // Change congestion control from the default TCP Reno (RFC 2581) to H-TCP
  75. usrsctp_sysctl_set_sctp_default_cc_module(SCTP_CC_HTCP);
  76. // Enable Non-Renegable Selective Acknowledgments (NR-SACKs)
  77. usrsctp_sysctl_set_sctp_nrsack_enable(1);
  78. // Increase the initial window size to 10 MTUs (RFC 6928)
  79. usrsctp_sysctl_set_sctp_initial_cwnd(10);
  80. // Reduce SACK delay from the default 200ms to 20ms
  81. usrsctp_sysctl_set_sctp_delayed_sack_time_default(20); // ms
  82. }
  83. void SctpTransport::Cleanup() {
  84. while (usrsctp_finish() != 0)
  85. std::this_thread::sleep_for(100ms);
  86. }
  87. SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port,
  88. message_callback recvCallback, amount_callback bufferedAmountCallback,
  89. state_callback stateChangeCallback)
  90. : Transport(lower, std::move(stateChangeCallback)), mPort(port), mPendingRecvCount(0),
  91. mSendQueue(0, message_size_func), mBufferedAmountCallback(std::move(bufferedAmountCallback)) {
  92. onRecv(recvCallback);
  93. PLOG_DEBUG << "Initializing SCTP transport";
  94. usrsctp_register_address(this);
  95. {
  96. std::unique_lock lock(InstancesMutex);
  97. Instances.insert(this);
  98. }
  99. mSock = usrsctp_socket(AF_CONN, SOCK_STREAM, IPPROTO_SCTP, nullptr, nullptr, 0, nullptr);
  100. if (!mSock)
  101. throw std::runtime_error("Could not create SCTP socket, errno=" + std::to_string(errno));
  102. usrsctp_set_upcall(mSock, &SctpTransport::UpcallCallback, this);
  103. if (usrsctp_set_non_blocking(mSock, 1))
  104. throw std::runtime_error("Unable to set non-blocking mode, errno=" + std::to_string(errno));
  105. // SCTP must stop sending after the lower layer is shut down, so disable linger
  106. struct linger sol = {};
  107. sol.l_onoff = 1;
  108. sol.l_linger = 0;
  109. if (usrsctp_setsockopt(mSock, SOL_SOCKET, SO_LINGER, &sol, sizeof(sol)))
  110. throw std::runtime_error("Could not set socket option SO_LINGER, errno=" +
  111. std::to_string(errno));
  112. struct sctp_assoc_value av = {};
  113. av.assoc_id = SCTP_ALL_ASSOC;
  114. av.assoc_value = 1;
  115. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, &av, sizeof(av)))
  116. throw std::runtime_error("Could not set socket option SCTP_ENABLE_STREAM_RESET, errno=" +
  117. std::to_string(errno));
  118. int on = 1;
  119. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_RECVRCVINFO, &on, sizeof(on)))
  120. throw std::runtime_error("Could set socket option SCTP_RECVRCVINFO, errno=" +
  121. std::to_string(errno));
  122. struct sctp_event se = {};
  123. se.se_assoc_id = SCTP_ALL_ASSOC;
  124. se.se_on = 1;
  125. se.se_type = SCTP_ASSOC_CHANGE;
  126. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_EVENT, &se, sizeof(se)))
  127. throw std::runtime_error("Could not subscribe to event SCTP_ASSOC_CHANGE, errno=" +
  128. std::to_string(errno));
  129. se.se_type = SCTP_SENDER_DRY_EVENT;
  130. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_EVENT, &se, sizeof(se)))
  131. throw std::runtime_error("Could not subscribe to event SCTP_SENDER_DRY_EVENT, errno=" +
  132. std::to_string(errno));
  133. se.se_type = SCTP_STREAM_RESET_EVENT;
  134. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_EVENT, &se, sizeof(se)))
  135. throw std::runtime_error("Could not subscribe to event SCTP_STREAM_RESET_EVENT, errno=" +
  136. std::to_string(errno));
  137. // The sender SHOULD disable the Nagle algorithm (see RFC1122) to minimize the latency.
  138. // See https://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-13#section-6.6
  139. int nodelay = 1;
  140. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_NODELAY, &nodelay, sizeof(nodelay)))
  141. throw std::runtime_error("Could not set socket option SCTP_NODELAY, errno=" +
  142. std::to_string(errno));
  143. struct sctp_paddrparams spp = {};
  144. // Enable SCTP heartbeats
  145. spp.spp_flags = SPP_HB_ENABLE;
  146. // RFC 8261 5. DTLS considerations:
  147. // If path MTU discovery is performed by the SCTP layer and IPv4 is used as the network-layer
  148. // protocol, the DTLS implementation SHOULD allow the DTLS user to enforce that the
  149. // corresponding IPv4 packet is sent with the Don't Fragment (DF) bit set. If controlling the DF
  150. // bit is not possible (for example, due to implementation restrictions), a safe value for the
  151. // path MTU has to be used by the SCTP stack. It is RECOMMENDED that the safe value not exceed
  152. // 1200 bytes.
  153. // See https://tools.ietf.org/html/rfc8261#section-5
  154. #if USE_PMTUD
  155. // Enable SCTP path MTU discovery
  156. spp.spp_flags |= SPP_PMTUD_ENABLE;
  157. #else
  158. // Fall back to a safe MTU value.
  159. spp.spp_flags |= SPP_PMTUD_DISABLE;
  160. spp.spp_pathmtu = 1200;
  161. #endif
  162. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS, &spp, sizeof(spp)))
  163. throw std::runtime_error("Could not set socket option SCTP_PEER_ADDR_PARAMS, errno=" +
  164. std::to_string(errno));
  165. // The IETF draft recommends the number of streams negotiated during SCTP association to be
  166. // 65535.
  167. // See https://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-13#section-6.2
  168. struct sctp_initmsg sinit = {};
  169. sinit.sinit_num_ostreams = 65535;
  170. sinit.sinit_max_instreams = 65535;
  171. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_INITMSG, &sinit, sizeof(sinit)))
  172. throw std::runtime_error("Could not set socket option SCTP_INITMSG, errno=" +
  173. std::to_string(errno));
  174. // Prevent fragmented interleave of messages (i.e. level 0), see RFC 6458 8.1.20.
  175. // Unless the user has set the fragmentation interleave level to 0, notifications
  176. // may also be interleaved with partially delivered messages.
  177. int level = 0;
  178. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_FRAGMENT_INTERLEAVE, &level, sizeof(level)))
  179. throw std::runtime_error("Could not disable SCTP fragmented interleave, errno=" +
  180. std::to_string(errno));
  181. // The default send and receive window size of usrsctp is 256KiB, which is too small for
  182. // realistic RTTs, therefore we increase it to 1MiB for better performance.
  183. // See https://bugzilla.mozilla.org/show_bug.cgi?id=1051685
  184. int bufferSize = 1024 * 1024;
  185. if (usrsctp_setsockopt(mSock, SOL_SOCKET, SO_RCVBUF, &bufferSize, sizeof(bufferSize)))
  186. throw std::runtime_error("Could not set SCTP recv buffer size, errno=" +
  187. std::to_string(errno));
  188. if (usrsctp_setsockopt(mSock, SOL_SOCKET, SO_SNDBUF, &bufferSize, sizeof(bufferSize)))
  189. throw std::runtime_error("Could not set SCTP send buffer size, errno=" +
  190. std::to_string(errno));
  191. }
  192. SctpTransport::~SctpTransport() {
  193. stop();
  194. close();
  195. usrsctp_deregister_address(this);
  196. {
  197. std::unique_lock lock(InstancesMutex);
  198. Instances.erase(this);
  199. }
  200. }
  201. void SctpTransport::start() {
  202. Transport::start();
  203. registerIncoming();
  204. connect();
  205. }
  206. bool SctpTransport::stop() {
  207. // Transport::stop() will unregister incoming() from the lower layer, therefore we need to make
  208. // sure the thread from lower layers is not blocked in incoming() by the WrittenOnce condition.
  209. mWrittenOnce = true;
  210. mWrittenCondition.notify_all();
  211. if (!Transport::stop())
  212. return false;
  213. mSendQueue.stop();
  214. safeFlush();
  215. shutdown();
  216. onRecv(nullptr);
  217. return true;
  218. }
  219. void SctpTransport::close() {
  220. if (mSock) {
  221. mProcessor.join();
  222. usrsctp_close(mSock);
  223. mSock = nullptr;
  224. }
  225. }
  226. void SctpTransport::connect() {
  227. if (!mSock)
  228. throw std::logic_error("Attempted SCTP connect with closed socket");
  229. PLOG_DEBUG << "SCTP connecting";
  230. changeState(State::Connecting);
  231. struct sockaddr_conn sconn = {};
  232. sconn.sconn_family = AF_CONN;
  233. sconn.sconn_port = htons(mPort);
  234. sconn.sconn_addr = this;
  235. #ifdef HAVE_SCONN_LEN
  236. sconn.sconn_len = sizeof(sconn);
  237. #endif
  238. if (usrsctp_bind(mSock, reinterpret_cast<struct sockaddr *>(&sconn), sizeof(sconn)))
  239. throw std::runtime_error("Could not bind usrsctp socket, errno=" + std::to_string(errno));
  240. // According to the IETF draft, both endpoints must initiate the SCTP association, in a
  241. // simultaneous-open manner, irrelevent to the SDP setup role.
  242. // See https://tools.ietf.org/html/draft-ietf-mmusic-sctp-sdp-26#section-9.3
  243. int ret = usrsctp_connect(mSock, reinterpret_cast<struct sockaddr *>(&sconn), sizeof(sconn));
  244. if (ret && errno != EINPROGRESS)
  245. throw std::runtime_error("Connection attempt failed, errno=" + std::to_string(errno));
  246. }
  247. void SctpTransport::shutdown() {
  248. if (!mSock)
  249. return;
  250. PLOG_DEBUG << "SCTP shutdown";
  251. if (usrsctp_shutdown(mSock, SHUT_RDWR) != 0 && errno != ENOTCONN) {
  252. PLOG_WARNING << "SCTP shutdown failed, errno=" << errno;
  253. }
  254. close();
  255. PLOG_INFO << "SCTP disconnected";
  256. changeState(State::Disconnected);
  257. mWrittenCondition.notify_all();
  258. }
  259. bool SctpTransport::send(message_ptr message) {
  260. std::lock_guard lock(mSendMutex);
  261. if (!message)
  262. return trySendQueue();
  263. PLOG_VERBOSE << "Send size=" << message->size();
  264. // Flush the queue, and if nothing is pending, try to send directly
  265. if (trySendQueue() && trySendMessage(message))
  266. return true;
  267. mSendQueue.push(message);
  268. updateBufferedAmount(uint16_t(message->stream), long(message_size_func(message)));
  269. return false;
  270. }
  271. void SctpTransport::closeStream(unsigned int stream) {
  272. send(make_message(0, Message::Reset, uint16_t(stream)));
  273. }
  274. void SctpTransport::flush() {
  275. std::lock_guard lock(mSendMutex);
  276. trySendQueue();
  277. }
  278. void SctpTransport::incoming(message_ptr message) {
  279. // There could be a race condition here where we receive the remote INIT before the local one is
  280. // sent, which would result in the connection being aborted. Therefore, we need to wait for data
  281. // to be sent on our side (i.e. the local INIT) before proceeding.
  282. if (!mWrittenOnce) { // test the atomic boolean is not set first to prevent a lock contention
  283. std::unique_lock lock(mWriteMutex);
  284. mWrittenCondition.wait(lock, [&]() { return mWrittenOnce.load(); });
  285. }
  286. if (!message) {
  287. PLOG_INFO << "SCTP disconnected";
  288. changeState(State::Disconnected);
  289. recv(nullptr);
  290. return;
  291. }
  292. PLOG_VERBOSE << "Incoming size=" << message->size();
  293. usrsctp_conninput(this, message->data(), message->size(), 0);
  294. }
  295. bool SctpTransport::outgoing(message_ptr message) {
  296. // Set recommended medium-priority DSCP value
  297. // See https://tools.ietf.org/html/draft-ietf-tsvwg-rtcweb-qos-18
  298. message->dscp = 10; // AF11: Assured Forwarding class 1, low drop probability
  299. return Transport::outgoing(std::move(message));
  300. }
  301. void SctpTransport::doRecv() {
  302. std::lock_guard lock(mRecvMutex);
  303. --mPendingRecvCount;
  304. try {
  305. while (state() != State::Disconnected && state() != State::Failed) {
  306. const size_t bufferSize = 65536;
  307. byte buffer[bufferSize];
  308. socklen_t fromlen = 0;
  309. struct sctp_rcvinfo info = {};
  310. socklen_t infolen = sizeof(info);
  311. unsigned int infotype = 0;
  312. int flags = 0;
  313. ssize_t len = usrsctp_recvv(mSock, buffer, bufferSize, nullptr, &fromlen, &info,
  314. &infolen, &infotype, &flags);
  315. if (len < 0) {
  316. if (errno == EWOULDBLOCK || errno == EAGAIN || errno == ECONNRESET)
  317. break;
  318. else
  319. throw std::runtime_error("SCTP recv failed, errno=" + std::to_string(errno));
  320. }
  321. PLOG_VERBOSE << "SCTP recv, len=" << len;
  322. // SCTP_FRAGMENT_INTERLEAVE does not seem to work as expected for messages > 64KB,
  323. // therefore partial notifications and messages need to be handled separately.
  324. if (flags & MSG_NOTIFICATION) {
  325. // SCTP event notification
  326. mPartialNotification.insert(mPartialNotification.end(), buffer, buffer + len);
  327. if (flags & MSG_EOR) {
  328. // Notification is complete, process it
  329. auto notification =
  330. reinterpret_cast<union sctp_notification *>(mPartialNotification.data());
  331. processNotification(notification, mPartialNotification.size());
  332. mPartialNotification.clear();
  333. }
  334. } else {
  335. // SCTP message
  336. mPartialMessage.insert(mPartialMessage.end(), buffer, buffer + len);
  337. if (flags & MSG_EOR) {
  338. // Message is complete, process it
  339. if (infotype != SCTP_RECVV_RCVINFO)
  340. throw std::runtime_error("Missing SCTP recv info");
  341. processData(std::move(mPartialMessage), info.rcv_sid,
  342. PayloadId(ntohl(info.rcv_ppid)));
  343. mPartialMessage.clear();
  344. }
  345. }
  346. }
  347. } catch (const std::exception &e) {
  348. PLOG_WARNING << e.what();
  349. }
  350. }
  351. bool SctpTransport::trySendQueue() {
  352. // Requires mSendMutex to be locked
  353. while (auto next = mSendQueue.peek()) {
  354. message_ptr message = std::move(*next);
  355. if (!trySendMessage(message))
  356. return false;
  357. mSendQueue.pop();
  358. updateBufferedAmount(uint16_t(message->stream), -long(message_size_func(message)));
  359. }
  360. return true;
  361. }
  362. bool SctpTransport::trySendMessage(message_ptr message) {
  363. // Requires mSendMutex to be locked
  364. if (!mSock || state() != State::Connected)
  365. return false;
  366. uint32_t ppid;
  367. switch (message->type) {
  368. case Message::String:
  369. ppid = !message->empty() ? PPID_STRING : PPID_STRING_EMPTY;
  370. break;
  371. case Message::Binary:
  372. ppid = !message->empty() ? PPID_BINARY : PPID_BINARY_EMPTY;
  373. break;
  374. case Message::Control:
  375. ppid = PPID_CONTROL;
  376. break;
  377. case Message::Reset:
  378. sendReset(uint16_t(message->stream));
  379. return true;
  380. default:
  381. // Ignore
  382. return true;
  383. }
  384. PLOG_VERBOSE << "SCTP try send size=" << message->size();
  385. // TODO: Implement SCTP ndata specification draft when supported everywhere
  386. // See https://tools.ietf.org/html/draft-ietf-tsvwg-sctp-ndata-08
  387. const Reliability reliability = message->reliability ? *message->reliability : Reliability();
  388. struct sctp_sendv_spa spa = {};
  389. // set sndinfo
  390. spa.sendv_flags |= SCTP_SEND_SNDINFO_VALID;
  391. spa.sendv_sndinfo.snd_sid = uint16_t(message->stream);
  392. spa.sendv_sndinfo.snd_ppid = htonl(ppid);
  393. spa.sendv_sndinfo.snd_flags |= SCTP_EOR; // implicit here
  394. // set prinfo
  395. spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
  396. if (reliability.unordered)
  397. spa.sendv_sndinfo.snd_flags |= SCTP_UNORDERED;
  398. switch (reliability.type) {
  399. case Reliability::Type::Rexmit:
  400. spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
  401. spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_RTX;
  402. spa.sendv_prinfo.pr_value = uint32_t(std::get<int>(reliability.rexmit));
  403. break;
  404. case Reliability::Type::Timed:
  405. spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
  406. spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_TTL;
  407. spa.sendv_prinfo.pr_value = uint32_t(std::get<milliseconds>(reliability.rexmit).count());
  408. break;
  409. default:
  410. spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_NONE;
  411. break;
  412. }
  413. ssize_t ret;
  414. if (!message->empty()) {
  415. ret = usrsctp_sendv(mSock, message->data(), message->size(), nullptr, 0, &spa, sizeof(spa),
  416. SCTP_SENDV_SPA, 0);
  417. } else {
  418. const char zero = 0;
  419. ret = usrsctp_sendv(mSock, &zero, 1, nullptr, 0, &spa, sizeof(spa), SCTP_SENDV_SPA, 0);
  420. }
  421. if (ret < 0) {
  422. if (errno == EWOULDBLOCK || errno == EAGAIN) {
  423. PLOG_VERBOSE << "SCTP sending not possible";
  424. return false;
  425. }
  426. PLOG_ERROR << "SCTP sending failed, errno=" << errno;
  427. throw std::runtime_error("Sending failed, errno=" + std::to_string(errno));
  428. }
  429. PLOG_VERBOSE << "SCTP sent size=" << message->size();
  430. if (message->type == Message::Type::Binary || message->type == Message::Type::String)
  431. mBytesSent += message->size();
  432. return true;
  433. }
  434. void SctpTransport::updateBufferedAmount(uint16_t streamId, long delta) {
  435. // Requires mSendMutex to be locked
  436. auto it = mBufferedAmount.insert(std::make_pair(streamId, 0)).first;
  437. size_t amount = size_t(std::max(long(it->second) + delta, long(0)));
  438. if (amount == 0)
  439. mBufferedAmount.erase(it);
  440. else
  441. it->second = amount;
  442. mSendMutex.unlock();
  443. try {
  444. mBufferedAmountCallback(streamId, amount);
  445. } catch (const std::exception &e) {
  446. PLOG_DEBUG << "SCTP buffered amount callback: " << e.what();
  447. }
  448. mSendMutex.lock();
  449. }
  450. void SctpTransport::sendReset(uint16_t streamId) {
  451. // Requires mSendMutex to be locked
  452. if (!mSock || state() != State::Connected)
  453. return;
  454. PLOG_DEBUG << "SCTP resetting stream " << streamId;
  455. using srs_t = struct sctp_reset_streams;
  456. const size_t len = sizeof(srs_t) + sizeof(uint16_t);
  457. byte buffer[len] = {};
  458. srs_t &srs = *reinterpret_cast<srs_t *>(buffer);
  459. srs.srs_flags = SCTP_STREAM_RESET_OUTGOING;
  460. srs.srs_number_streams = 1;
  461. srs.srs_stream_list[0] = streamId;
  462. mWritten = false;
  463. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_RESET_STREAMS, &srs, len) == 0) {
  464. std::unique_lock lock(mWriteMutex); // locking before setsockopt might deadlock usrsctp...
  465. mWrittenCondition.wait_for(lock, 1000ms,
  466. [&]() { return mWritten || state() != State::Connected; });
  467. } else if (errno == EINVAL) {
  468. PLOG_DEBUG << "SCTP stream " << streamId << " already reset";
  469. } else {
  470. PLOG_WARNING << "SCTP reset stream " << streamId << " failed, errno=" << errno;
  471. }
  472. }
  473. bool SctpTransport::safeFlush() {
  474. try {
  475. flush();
  476. return true;
  477. } catch (const std::exception &e) {
  478. PLOG_WARNING << "SCTP flush: " << e.what();
  479. return false;
  480. }
  481. }
  482. void SctpTransport::handleUpcall() {
  483. if (!mSock)
  484. return;
  485. PLOG_VERBOSE << "Handle upcall";
  486. int events = usrsctp_get_events(mSock);
  487. if (events & SCTP_EVENT_READ && mPendingRecvCount == 0) {
  488. ++mPendingRecvCount;
  489. mProcessor.enqueue(&SctpTransport::doRecv, this);
  490. }
  491. if (events & SCTP_EVENT_WRITE)
  492. mProcessor.enqueue(&SctpTransport::safeFlush, this);
  493. }
  494. int SctpTransport::handleWrite(byte *data, size_t len, uint8_t /*tos*/, uint8_t /*set_df*/) {
  495. try {
  496. std::unique_lock lock(mWriteMutex);
  497. PLOG_VERBOSE << "Handle write, len=" << len;
  498. if (!outgoing(make_message(data, data + len)))
  499. return -1;
  500. mWritten = true;
  501. mWrittenOnce = true;
  502. mWrittenCondition.notify_all();
  503. } catch (const std::exception &e) {
  504. PLOG_ERROR << "SCTP write: " << e.what();
  505. return -1;
  506. }
  507. return 0; // success
  508. }
  509. void SctpTransport::processData(binary &&data, uint16_t sid, PayloadId ppid) {
  510. PLOG_VERBOSE << "Process data, size=" << data.size();
  511. // The usage of the PPIDs "WebRTC String Partial" and "WebRTC Binary Partial" is deprecated.
  512. // See https://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-13#section-6.6
  513. // We handle them at reception for compatibility reasons but should never send them.
  514. switch (ppid) {
  515. case PPID_CONTROL:
  516. recv(make_message(std::move(data), Message::Control, sid));
  517. break;
  518. case PPID_STRING_PARTIAL: // deprecated
  519. mPartialStringData.insert(mPartialStringData.end(), data.begin(), data.end());
  520. break;
  521. case PPID_STRING:
  522. if (mPartialStringData.empty()) {
  523. mBytesReceived += data.size();
  524. recv(make_message(std::move(data), Message::String, sid));
  525. } else {
  526. mPartialStringData.insert(mPartialStringData.end(), data.begin(), data.end());
  527. mBytesReceived += mPartialStringData.size();
  528. recv(make_message(std::move(mPartialStringData), Message::String, sid));
  529. mPartialStringData.clear();
  530. }
  531. break;
  532. case PPID_STRING_EMPTY:
  533. recv(make_message(std::move(mPartialStringData), Message::String, sid));
  534. mPartialStringData.clear();
  535. break;
  536. case PPID_BINARY_PARTIAL: // deprecated
  537. mPartialBinaryData.insert(mPartialBinaryData.end(), data.begin(), data.end());
  538. break;
  539. case PPID_BINARY:
  540. if (mPartialBinaryData.empty()) {
  541. mBytesReceived += data.size();
  542. recv(make_message(std::move(data), Message::Binary, sid));
  543. } else {
  544. mPartialBinaryData.insert(mPartialBinaryData.end(), data.begin(), data.end());
  545. mBytesReceived += mPartialBinaryData.size();
  546. recv(make_message(std::move(mPartialBinaryData), Message::Binary, sid));
  547. mPartialBinaryData.clear();
  548. }
  549. break;
  550. case PPID_BINARY_EMPTY:
  551. recv(make_message(std::move(mPartialBinaryData), Message::Binary, sid));
  552. mPartialBinaryData.clear();
  553. break;
  554. default:
  555. // Unknown
  556. COUNTER_UNKNOWN_PPID++;
  557. PLOG_VERBOSE << "Unknown PPID: " << uint32_t(ppid);
  558. return;
  559. }
  560. }
  561. void SctpTransport::processNotification(const union sctp_notification *notify, size_t len) {
  562. if (len != size_t(notify->sn_header.sn_length)) {
  563. COUNTER_BAD_NOTIF_LEN++;
  564. return;
  565. }
  566. auto type = notify->sn_header.sn_type;
  567. PLOG_VERBOSE << "Processing notification, type=" << type;
  568. switch (type) {
  569. case SCTP_ASSOC_CHANGE: {
  570. const struct sctp_assoc_change &assoc_change = notify->sn_assoc_change;
  571. if (assoc_change.sac_state == SCTP_COMM_UP) {
  572. PLOG_INFO << "SCTP connected";
  573. changeState(State::Connected);
  574. } else {
  575. if (state() == State::Connecting) {
  576. PLOG_ERROR << "SCTP connection failed";
  577. changeState(State::Failed);
  578. } else {
  579. PLOG_INFO << "SCTP disconnected";
  580. changeState(State::Disconnected);
  581. }
  582. mWrittenCondition.notify_all();
  583. }
  584. break;
  585. }
  586. case SCTP_SENDER_DRY_EVENT: {
  587. PLOG_VERBOSE << "SCTP dry event";
  588. // It should not be necessary since the send callback should have been called already,
  589. // but to be sure, let's try to send now.
  590. safeFlush();
  591. break;
  592. }
  593. case SCTP_STREAM_RESET_EVENT: {
  594. const struct sctp_stream_reset_event &reset_event = notify->sn_strreset_event;
  595. const int count = (reset_event.strreset_length - sizeof(reset_event)) / sizeof(uint16_t);
  596. const uint16_t flags = reset_event.strreset_flags;
  597. IF_PLOG(plog::verbose) {
  598. std::ostringstream desc;
  599. desc << "flags=";
  600. if (flags & SCTP_STREAM_RESET_OUTGOING_SSN && flags & SCTP_STREAM_RESET_INCOMING_SSN)
  601. desc << "outgoing|incoming";
  602. else if (flags & SCTP_STREAM_RESET_OUTGOING_SSN)
  603. desc << "outgoing";
  604. else if (flags & SCTP_STREAM_RESET_INCOMING_SSN)
  605. desc << "incoming";
  606. else
  607. desc << "0";
  608. desc << ", streams=[";
  609. for (int i = 0; i < count; ++i) {
  610. uint16_t streamId = reset_event.strreset_stream_list[i];
  611. desc << (i != 0 ? "," : "") << streamId;
  612. }
  613. desc << "]";
  614. PLOG_VERBOSE << "SCTP reset event, " << desc.str();
  615. }
  616. if (flags & SCTP_STREAM_RESET_OUTGOING_SSN) {
  617. for (int i = 0; i < count; ++i) {
  618. uint16_t streamId = reset_event.strreset_stream_list[i];
  619. closeStream(streamId);
  620. }
  621. }
  622. if (flags & SCTP_STREAM_RESET_INCOMING_SSN) {
  623. const byte dataChannelCloseMessage{0x04};
  624. for (int i = 0; i < count; ++i) {
  625. uint16_t streamId = reset_event.strreset_stream_list[i];
  626. recv(make_message(&dataChannelCloseMessage, &dataChannelCloseMessage + 1,
  627. Message::Control, streamId));
  628. }
  629. }
  630. break;
  631. }
  632. default:
  633. // Ignore
  634. break;
  635. }
  636. }
  637. void SctpTransport::clearStats() {
  638. mBytesReceived = 0;
  639. mBytesSent = 0;
  640. }
  641. size_t SctpTransport::bytesSent() { return mBytesSent; }
  642. size_t SctpTransport::bytesReceived() { return mBytesReceived; }
  643. std::optional<milliseconds> SctpTransport::rtt() {
  644. if (!mSock || state() != State::Connected)
  645. return nullopt;
  646. struct sctp_status status = {};
  647. socklen_t len = sizeof(status);
  648. if (usrsctp_getsockopt(mSock, IPPROTO_SCTP, SCTP_STATUS, &status, &len)) {
  649. COUNTER_BAD_SCTP_STATUS++;
  650. return nullopt;
  651. }
  652. return milliseconds(status.sstat_primary.spinfo_srtt);
  653. }
  654. void SctpTransport::UpcallCallback(struct socket *, void *arg, int /* flags */) {
  655. auto *transport = static_cast<SctpTransport *>(arg);
  656. std::shared_lock lock(InstancesMutex);
  657. if (Instances.find(transport) == Instances.end())
  658. return;
  659. transport->handleUpcall();
  660. }
  661. int SctpTransport::WriteCallback(void *ptr, void *data, size_t len, uint8_t tos, uint8_t set_df) {
  662. auto *transport = static_cast<SctpTransport *>(ptr);
  663. // Workaround for sctplab/usrsctp#405: Send callback is invoked on already closed socket
  664. // https://github.com/sctplab/usrsctp/issues/405
  665. std::shared_lock lock(InstancesMutex);
  666. if (Instances.find(transport) == Instances.end())
  667. return -1;
  668. return transport->handleWrite(static_cast<byte *>(data), len, tos, set_df);
  669. }
  670. } // namespace rtc