sctptransport.cpp 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769
  1. /**
  2. * Copyright (c) 2019 Paul-Louis Ageneau
  3. *
  4. * This library is free software; you can redistribute it and/or
  5. * modify it under the terms of the GNU Lesser General Public
  6. * License as published by the Free Software Foundation; either
  7. * version 2.1 of the License, or (at your option) any later version.
  8. *
  9. * This library is distributed in the hope that it will be useful,
  10. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  12. * Lesser General Public License for more details.
  13. *
  14. * You should have received a copy of the GNU Lesser General Public
  15. * License along with this library; if not, write to the Free Software
  16. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  17. */
  18. #include "sctptransport.hpp"
  19. #include <chrono>
  20. #include <exception>
  21. #include <iostream>
  22. #include <thread>
  23. #include <vector>
  24. #if !USE_NICE
  25. #ifndef __APPLE__
  26. // libjuice enables Linux path MTU discovery or sets the DF flag
  27. #define USE_PMTUD 1
  28. #else
  29. // Setting the DF flag is not available on Mac OS
  30. #define USE_PMTUD 0
  31. #endif
  32. #else // USE_NICE == 1
  33. #ifdef __linux__
  34. // Linux UDP does path MTU discovery by default (setting DF and returning EMSGSIZE)
  35. // It should be safe to enable discovery for SCTP.
  36. #define USE_PMTUD 1
  37. #else
  38. // Otherwise assume fragmentation
  39. #define USE_PMTUD 0
  40. #endif
  41. #endif
  42. using namespace std::chrono_literals;
  43. using namespace std::chrono;
  44. using std::shared_ptr;
  45. namespace rtc {
  46. std::unordered_set<SctpTransport *> SctpTransport::Instances;
  47. std::shared_mutex SctpTransport::InstancesMutex;
  48. void SctpTransport::Init() {
  49. usrsctp_init(0, &SctpTransport::WriteCallback, nullptr);
  50. usrsctp_sysctl_set_sctp_ecn_enable(0);
  51. usrsctp_sysctl_set_sctp_init_rtx_max_default(5);
  52. usrsctp_sysctl_set_sctp_path_rtx_max_default(5);
  53. usrsctp_sysctl_set_sctp_assoc_rtx_max_default(5); // single path
  54. usrsctp_sysctl_set_sctp_rto_min_default(1 * 1000); // ms
  55. usrsctp_sysctl_set_sctp_rto_max_default(10 * 1000); // ms
  56. usrsctp_sysctl_set_sctp_rto_initial_default(1 * 1000); // ms
  57. usrsctp_sysctl_set_sctp_init_rto_max_default(10 * 1000); // ms
  58. usrsctp_sysctl_set_sctp_heartbeat_interval_default(10 * 1000); // ms
  59. usrsctp_sysctl_set_sctp_max_chunks_on_queue(10 * 1024);
  60. // Change congestion control from the default TCP Reno (RFC 2581) to H-TCP
  61. usrsctp_sysctl_set_sctp_default_cc_module(SCTP_CC_HTCP);
  62. // Enable Non-Renegable Selective Acknowledgments (NR-SACKs)
  63. usrsctp_sysctl_set_sctp_nrsack_enable(1);
  64. // Increase the initial window size to 10 MTUs (RFC 6928)
  65. usrsctp_sysctl_set_sctp_initial_cwnd(10);
  66. // Reduce SACK delay from the default 200ms to 20ms
  67. usrsctp_sysctl_set_sctp_delayed_sack_time_default(20); // ms
  68. }
  69. void SctpTransport::Cleanup() {
  70. while (usrsctp_finish() != 0)
  71. std::this_thread::sleep_for(100ms);
  72. }
  73. SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port,
  74. message_callback recvCallback, amount_callback bufferedAmountCallback,
  75. state_callback stateChangeCallback)
  76. : Transport(lower, std::move(stateChangeCallback)), mPort(port), mPendingRecvCount(0),
  77. mSendQueue(0, message_size_func), mBufferedAmountCallback(std::move(bufferedAmountCallback)) {
  78. onRecv(recvCallback);
  79. PLOG_DEBUG << "Initializing SCTP transport";
  80. usrsctp_register_address(this);
  81. {
  82. std::unique_lock lock(InstancesMutex);
  83. Instances.insert(this);
  84. }
  85. mSock = usrsctp_socket(AF_CONN, SOCK_STREAM, IPPROTO_SCTP, nullptr, nullptr, 0, nullptr);
  86. if (!mSock)
  87. throw std::runtime_error("Could not create SCTP socket, errno=" + std::to_string(errno));
  88. usrsctp_set_upcall(mSock, &SctpTransport::UpcallCallback, this);
  89. if (usrsctp_set_non_blocking(mSock, 1))
  90. throw std::runtime_error("Unable to set non-blocking mode, errno=" + std::to_string(errno));
  91. // SCTP must stop sending after the lower layer is shut down, so disable linger
  92. struct linger sol = {};
  93. sol.l_onoff = 1;
  94. sol.l_linger = 0;
  95. if (usrsctp_setsockopt(mSock, SOL_SOCKET, SO_LINGER, &sol, sizeof(sol)))
  96. throw std::runtime_error("Could not set socket option SO_LINGER, errno=" +
  97. std::to_string(errno));
  98. struct sctp_assoc_value av = {};
  99. av.assoc_id = SCTP_ALL_ASSOC;
  100. av.assoc_value = 1;
  101. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, &av, sizeof(av)))
  102. throw std::runtime_error("Could not set socket option SCTP_ENABLE_STREAM_RESET, errno=" +
  103. std::to_string(errno));
  104. int on = 1;
  105. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_RECVRCVINFO, &on, sizeof(on)))
  106. throw std::runtime_error("Could set socket option SCTP_RECVRCVINFO, errno=" +
  107. std::to_string(errno));
  108. struct sctp_event se = {};
  109. se.se_assoc_id = SCTP_ALL_ASSOC;
  110. se.se_on = 1;
  111. se.se_type = SCTP_ASSOC_CHANGE;
  112. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_EVENT, &se, sizeof(se)))
  113. throw std::runtime_error("Could not subscribe to event SCTP_ASSOC_CHANGE, errno=" +
  114. std::to_string(errno));
  115. se.se_type = SCTP_SENDER_DRY_EVENT;
  116. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_EVENT, &se, sizeof(se)))
  117. throw std::runtime_error("Could not subscribe to event SCTP_SENDER_DRY_EVENT, errno=" +
  118. std::to_string(errno));
  119. se.se_type = SCTP_STREAM_RESET_EVENT;
  120. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_EVENT, &se, sizeof(se)))
  121. throw std::runtime_error("Could not subscribe to event SCTP_STREAM_RESET_EVENT, errno=" +
  122. std::to_string(errno));
  123. // The sender SHOULD disable the Nagle algorithm (see RFC1122) to minimize the latency.
  124. // See https://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-13#section-6.6
  125. int nodelay = 1;
  126. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_NODELAY, &nodelay, sizeof(nodelay)))
  127. throw std::runtime_error("Could not set socket option SCTP_NODELAY, errno=" +
  128. std::to_string(errno));
  129. struct sctp_paddrparams spp = {};
  130. #if USE_PMTUD
  131. // Enable SCTP path MTU discovery
  132. spp.spp_flags = SPP_PMTUD_ENABLE;
  133. #else
  134. // Fall back to a safe MTU value.
  135. spp.spp_flags = SPP_PMTUD_DISABLE;
  136. spp.spp_pathmtu = 1200; // Max safe value recommended by RFC 8261
  137. // See https://tools.ietf.org/html/rfc8261#section-5
  138. #endif
  139. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS, &spp, sizeof(spp)))
  140. throw std::runtime_error("Could not set socket option SCTP_PEER_ADDR_PARAMS, errno=" +
  141. std::to_string(errno));
  142. // The IETF draft recommends the number of streams negotiated during SCTP association to be
  143. // 65535. See https://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-13#section-6.2
  144. struct sctp_initmsg sinit = {};
  145. sinit.sinit_num_ostreams = 65535;
  146. sinit.sinit_max_instreams = 65535;
  147. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_INITMSG, &sinit, sizeof(sinit)))
  148. throw std::runtime_error("Could not set socket option SCTP_INITMSG, errno=" +
  149. std::to_string(errno));
  150. // Prevent fragmented interleave of messages (i.e. level 0), see RFC 6458 8.1.20.
  151. // Unless the user has set the fragmentation interleave level to 0, notifications
  152. // may also be interleaved with partially delivered messages.
  153. int level = 0;
  154. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_FRAGMENT_INTERLEAVE, &level, sizeof(level)))
  155. throw std::runtime_error("Could not disable SCTP fragmented interleave, errno=" +
  156. std::to_string(errno));
  157. // The default send and receive window size of usrsctp is 256KiB, which is too small for
  158. // realistic RTTs, therefore we increase it to 1MiB for better performance.
  159. // See https://bugzilla.mozilla.org/show_bug.cgi?id=1051685
  160. int bufferSize = 1024 * 1024;
  161. if (usrsctp_setsockopt(mSock, SOL_SOCKET, SO_RCVBUF, &bufferSize, sizeof(bufferSize)))
  162. throw std::runtime_error("Could not set SCTP recv buffer size, errno=" +
  163. std::to_string(errno));
  164. if (usrsctp_setsockopt(mSock, SOL_SOCKET, SO_SNDBUF, &bufferSize, sizeof(bufferSize)))
  165. throw std::runtime_error("Could not set SCTP send buffer size, errno=" +
  166. std::to_string(errno));
  167. }
  168. SctpTransport::~SctpTransport() {
  169. stop();
  170. close();
  171. usrsctp_deregister_address(this);
  172. {
  173. std::unique_lock lock(InstancesMutex);
  174. Instances.erase(this);
  175. }
  176. }
  177. void SctpTransport::start() {
  178. Transport::start();
  179. registerIncoming();
  180. connect();
  181. }
  182. bool SctpTransport::stop() {
  183. // Transport::stop() will unregister incoming() from the lower layer, therefore we need to make
  184. // sure the thread from lower layers is not blocked in incoming() by the WrittenOnce condition.
  185. mWrittenOnce = true;
  186. mWrittenCondition.notify_all();
  187. if (!Transport::stop())
  188. return false;
  189. mSendQueue.stop();
  190. safeFlush();
  191. shutdown();
  192. onRecv(nullptr);
  193. return true;
  194. }
  195. void SctpTransport::close() {
  196. if (mSock) {
  197. mProcessor.join();
  198. usrsctp_close(mSock);
  199. mSock = nullptr;
  200. }
  201. }
  202. void SctpTransport::connect() {
  203. if (!mSock)
  204. throw std::logic_error("Attempted SCTP connect with closed socket");
  205. PLOG_DEBUG << "SCTP connecting";
  206. changeState(State::Connecting);
  207. struct sockaddr_conn sconn = {};
  208. sconn.sconn_family = AF_CONN;
  209. sconn.sconn_port = htons(mPort);
  210. sconn.sconn_addr = this;
  211. #ifdef HAVE_SCONN_LEN
  212. sconn.sconn_len = sizeof(sconn);
  213. #endif
  214. if (usrsctp_bind(mSock, reinterpret_cast<struct sockaddr *>(&sconn), sizeof(sconn)))
  215. throw std::runtime_error("Could not bind usrsctp socket, errno=" + std::to_string(errno));
  216. // According to the IETF draft, both endpoints must initiate the SCTP association, in a
  217. // simultaneous-open manner, irrelevent to the SDP setup role.
  218. // See https://tools.ietf.org/html/draft-ietf-mmusic-sctp-sdp-26#section-9.3
  219. int ret = usrsctp_connect(mSock, reinterpret_cast<struct sockaddr *>(&sconn), sizeof(sconn));
  220. if (ret && errno != EINPROGRESS)
  221. throw std::runtime_error("Connection attempt failed, errno=" + std::to_string(errno));
  222. }
  223. void SctpTransport::shutdown() {
  224. if (!mSock)
  225. return;
  226. PLOG_DEBUG << "SCTP shutdown";
  227. if (usrsctp_shutdown(mSock, SHUT_RDWR) != 0 && errno != ENOTCONN) {
  228. PLOG_WARNING << "SCTP shutdown failed, errno=" << errno;
  229. }
  230. close();
  231. PLOG_INFO << "SCTP disconnected";
  232. changeState(State::Disconnected);
  233. mWrittenCondition.notify_all();
  234. }
  235. bool SctpTransport::send(message_ptr message) {
  236. std::lock_guard lock(mSendMutex);
  237. if (!message)
  238. return trySendQueue();
  239. PLOG_VERBOSE << "Send size=" << message->size();
  240. // Flush the queue, and if nothing is pending, try to send directly
  241. if (trySendQueue() && trySendMessage(message))
  242. return true;
  243. mSendQueue.push(message);
  244. updateBufferedAmount(uint16_t(message->stream), long(message_size_func(message)));
  245. return false;
  246. }
  247. void SctpTransport::closeStream(unsigned int stream) {
  248. send(make_message(0, Message::Reset, uint16_t(stream)));
  249. }
  250. void SctpTransport::flush() {
  251. std::lock_guard lock(mSendMutex);
  252. trySendQueue();
  253. }
  254. void SctpTransport::incoming(message_ptr message) {
  255. // There could be a race condition here where we receive the remote INIT before the local one is
  256. // sent, which would result in the connection being aborted. Therefore, we need to wait for data
  257. // to be sent on our side (i.e. the local INIT) before proceeding.
  258. if (!mWrittenOnce) { // test the atomic boolean is not set first to prevent a lock contention
  259. std::unique_lock lock(mWriteMutex);
  260. mWrittenCondition.wait(lock, [&]() { return mWrittenOnce.load(); });
  261. }
  262. if (!message) {
  263. PLOG_INFO << "SCTP disconnected";
  264. changeState(State::Disconnected);
  265. recv(nullptr);
  266. return;
  267. }
  268. PLOG_VERBOSE << "Incoming size=" << message->size();
  269. usrsctp_conninput(this, message->data(), message->size(), 0);
  270. }
  271. bool SctpTransport::outgoing(message_ptr message) {
  272. // Set recommended medium-priority DSCP value
  273. // See https://tools.ietf.org/html/draft-ietf-tsvwg-rtcweb-qos-18
  274. message->dscp = 10; // AF11: Assured Forwarding class 1, low drop probability
  275. return Transport::outgoing(std::move(message));
  276. }
  277. void SctpTransport::doRecv() {
  278. std::lock_guard lock(mRecvMutex);
  279. --mPendingRecvCount;
  280. try {
  281. while (true) {
  282. const size_t bufferSize = 65536;
  283. byte buffer[bufferSize];
  284. socklen_t fromlen = 0;
  285. struct sctp_rcvinfo info = {};
  286. socklen_t infolen = sizeof(info);
  287. unsigned int infotype = 0;
  288. int flags = 0;
  289. ssize_t len = usrsctp_recvv(mSock, buffer, bufferSize, nullptr, &fromlen, &info,
  290. &infolen, &infotype, &flags);
  291. if (len < 0) {
  292. if (errno == EWOULDBLOCK || errno == EAGAIN || errno == ECONNRESET)
  293. break;
  294. else
  295. throw std::runtime_error("SCTP recv failed, errno=" + std::to_string(errno));
  296. }
  297. PLOG_VERBOSE << "SCTP recv, len=" << len;
  298. // SCTP_FRAGMENT_INTERLEAVE does not seem to work as expected for messages > 64KB,
  299. // therefore partial notifications and messages need to be handled separately.
  300. if (flags & MSG_NOTIFICATION) {
  301. // SCTP event notification
  302. mPartialNotification.insert(mPartialNotification.end(), buffer, buffer + len);
  303. if (flags & MSG_EOR) {
  304. // Notification is complete, process it
  305. auto notification =
  306. reinterpret_cast<union sctp_notification *>(mPartialNotification.data());
  307. processNotification(notification, mPartialNotification.size());
  308. mPartialNotification.clear();
  309. }
  310. } else {
  311. // SCTP message
  312. mPartialMessage.insert(mPartialMessage.end(), buffer, buffer + len);
  313. if (flags & MSG_EOR) {
  314. // Message is complete, process it
  315. if (infotype != SCTP_RECVV_RCVINFO)
  316. throw std::runtime_error("Missing SCTP recv info");
  317. processData(std::move(mPartialMessage), info.rcv_sid,
  318. PayloadId(ntohl(info.rcv_ppid)));
  319. mPartialMessage.clear();
  320. }
  321. }
  322. }
  323. } catch (const std::exception &e) {
  324. PLOG_WARNING << e.what();
  325. }
  326. }
  327. bool SctpTransport::trySendQueue() {
  328. // Requires mSendMutex to be locked
  329. while (auto next = mSendQueue.peek()) {
  330. message_ptr message = std::move(*next);
  331. if (!trySendMessage(message))
  332. return false;
  333. mSendQueue.pop();
  334. updateBufferedAmount(uint16_t(message->stream), -long(message_size_func(message)));
  335. }
  336. return true;
  337. }
  338. bool SctpTransport::trySendMessage(message_ptr message) {
  339. // Requires mSendMutex to be locked
  340. if (!mSock || state() != State::Connected)
  341. return false;
  342. uint32_t ppid;
  343. switch (message->type) {
  344. case Message::String:
  345. ppid = !message->empty() ? PPID_STRING : PPID_STRING_EMPTY;
  346. break;
  347. case Message::Binary:
  348. ppid = !message->empty() ? PPID_BINARY : PPID_BINARY_EMPTY;
  349. break;
  350. case Message::Control:
  351. ppid = PPID_CONTROL;
  352. break;
  353. case Message::Reset:
  354. sendReset(uint16_t(message->stream));
  355. return true;
  356. default:
  357. // Ignore
  358. return true;
  359. }
  360. PLOG_VERBOSE << "SCTP try send size=" << message->size();
  361. // TODO: Implement SCTP ndata specification draft when supported everywhere
  362. // See https://tools.ietf.org/html/draft-ietf-tsvwg-sctp-ndata-08
  363. const Reliability reliability = message->reliability ? *message->reliability : Reliability();
  364. struct sctp_sendv_spa spa = {};
  365. // set sndinfo
  366. spa.sendv_flags |= SCTP_SEND_SNDINFO_VALID;
  367. spa.sendv_sndinfo.snd_sid = uint16_t(message->stream);
  368. spa.sendv_sndinfo.snd_ppid = htonl(ppid);
  369. spa.sendv_sndinfo.snd_flags |= SCTP_EOR; // implicit here
  370. // set prinfo
  371. spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
  372. if (reliability.unordered)
  373. spa.sendv_sndinfo.snd_flags |= SCTP_UNORDERED;
  374. switch (reliability.type) {
  375. case Reliability::Type::Rexmit:
  376. spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
  377. spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_RTX;
  378. spa.sendv_prinfo.pr_value = uint32_t(std::get<int>(reliability.rexmit));
  379. break;
  380. case Reliability::Type::Timed:
  381. spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
  382. spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_TTL;
  383. spa.sendv_prinfo.pr_value = uint32_t(std::get<milliseconds>(reliability.rexmit).count());
  384. break;
  385. default:
  386. spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_NONE;
  387. break;
  388. }
  389. ssize_t ret;
  390. if (!message->empty()) {
  391. ret = usrsctp_sendv(mSock, message->data(), message->size(), nullptr, 0, &spa, sizeof(spa),
  392. SCTP_SENDV_SPA, 0);
  393. } else {
  394. const char zero = 0;
  395. ret = usrsctp_sendv(mSock, &zero, 1, nullptr, 0, &spa, sizeof(spa), SCTP_SENDV_SPA, 0);
  396. }
  397. if (ret < 0) {
  398. if (errno == EWOULDBLOCK || errno == EAGAIN) {
  399. PLOG_VERBOSE << "SCTP sending not possible";
  400. return false;
  401. }
  402. PLOG_ERROR << "SCTP sending failed, errno=" << errno;
  403. throw std::runtime_error("Sending failed, errno=" + std::to_string(errno));
  404. }
  405. PLOG_VERBOSE << "SCTP sent size=" << message->size();
  406. if (message->type == Message::Type::Binary || message->type == Message::Type::String)
  407. mBytesSent += message->size();
  408. return true;
  409. }
  410. void SctpTransport::updateBufferedAmount(uint16_t streamId, long delta) {
  411. // Requires mSendMutex to be locked
  412. auto it = mBufferedAmount.insert(std::make_pair(streamId, 0)).first;
  413. size_t amount = size_t(std::max(long(it->second) + delta, long(0)));
  414. if (amount == 0)
  415. mBufferedAmount.erase(it);
  416. else
  417. it->second = amount;
  418. mSendMutex.unlock();
  419. try {
  420. mBufferedAmountCallback(streamId, amount);
  421. } catch (const std::exception &e) {
  422. PLOG_DEBUG << "SCTP buffered amount callback: " << e.what();
  423. }
  424. mSendMutex.lock();
  425. }
  426. void SctpTransport::sendReset(uint16_t streamId) {
  427. // Requires mSendMutex to be locked
  428. if (!mSock || state() != State::Connected)
  429. return;
  430. PLOG_DEBUG << "SCTP resetting stream " << streamId;
  431. using srs_t = struct sctp_reset_streams;
  432. const size_t len = sizeof(srs_t) + sizeof(uint16_t);
  433. byte buffer[len] = {};
  434. srs_t &srs = *reinterpret_cast<srs_t *>(buffer);
  435. srs.srs_flags = SCTP_STREAM_RESET_OUTGOING;
  436. srs.srs_number_streams = 1;
  437. srs.srs_stream_list[0] = streamId;
  438. mWritten = false;
  439. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_RESET_STREAMS, &srs, len) == 0) {
  440. std::unique_lock lock(mWriteMutex); // locking before setsockopt might deadlock usrsctp...
  441. mWrittenCondition.wait_for(lock, 1000ms,
  442. [&]() { return mWritten || state() != State::Connected; });
  443. } else if (errno == EINVAL) {
  444. PLOG_DEBUG << "SCTP stream " << streamId << " already reset";
  445. } else {
  446. PLOG_WARNING << "SCTP reset stream " << streamId << " failed, errno=" << errno;
  447. }
  448. }
  449. bool SctpTransport::safeFlush() {
  450. try {
  451. flush();
  452. return true;
  453. } catch (const std::exception &e) {
  454. PLOG_WARNING << "SCTP flush: " << e.what();
  455. return false;
  456. }
  457. }
  458. void SctpTransport::handleUpcall() {
  459. if (!mSock)
  460. return;
  461. PLOG_VERBOSE << "Handle upcall";
  462. int events = usrsctp_get_events(mSock);
  463. if (events & SCTP_EVENT_READ && mPendingRecvCount == 0) {
  464. ++mPendingRecvCount;
  465. mProcessor.enqueue(&SctpTransport::doRecv, this);
  466. }
  467. if (events & SCTP_EVENT_WRITE)
  468. mProcessor.enqueue(&SctpTransport::safeFlush, this);
  469. }
  470. int SctpTransport::handleWrite(byte *data, size_t len, uint8_t /*tos*/, uint8_t /*set_df*/) {
  471. try {
  472. std::unique_lock lock(mWriteMutex);
  473. PLOG_VERBOSE << "Handle write, len=" << len;
  474. auto message = make_message(data, data + len);
  475. if (!outgoing(make_message(data, data + len)))
  476. return -1;
  477. mWritten = true;
  478. mWrittenOnce = true;
  479. mWrittenCondition.notify_all();
  480. } catch (const std::exception &e) {
  481. PLOG_ERROR << "SCTP write: " << e.what();
  482. return -1;
  483. }
  484. return 0; // success
  485. }
  486. void SctpTransport::processData(binary &&data, uint16_t sid, PayloadId ppid) {
  487. PLOG_VERBOSE << "Process data, size=" << data.size();
  488. // The usage of the PPIDs "WebRTC String Partial" and "WebRTC Binary Partial" is deprecated.
  489. // See https://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-13#section-6.6
  490. // We handle them at reception for compatibility reasons but should never send them.
  491. switch (ppid) {
  492. case PPID_CONTROL:
  493. recv(make_message(std::move(data), Message::Control, sid));
  494. break;
  495. case PPID_STRING_PARTIAL: // deprecated
  496. mPartialStringData.insert(mPartialStringData.end(), data.begin(), data.end());
  497. break;
  498. case PPID_STRING:
  499. if (mPartialStringData.empty()) {
  500. mBytesReceived += data.size();
  501. recv(make_message(std::move(data), Message::String, sid));
  502. } else {
  503. mPartialStringData.insert(mPartialStringData.end(), data.begin(), data.end());
  504. mBytesReceived += mPartialStringData.size();
  505. recv(make_message(std::move(mPartialStringData), Message::String, sid));
  506. mPartialStringData.clear();
  507. }
  508. break;
  509. case PPID_STRING_EMPTY:
  510. recv(make_message(std::move(mPartialStringData), Message::String, sid));
  511. mPartialStringData.clear();
  512. break;
  513. case PPID_BINARY_PARTIAL: // deprecated
  514. mPartialBinaryData.insert(mPartialBinaryData.end(), data.begin(), data.end());
  515. break;
  516. case PPID_BINARY:
  517. if (mPartialBinaryData.empty()) {
  518. mBytesReceived += data.size();
  519. recv(make_message(std::move(data), Message::Binary, sid));
  520. } else {
  521. mPartialBinaryData.insert(mPartialBinaryData.end(), data.begin(), data.end());
  522. mBytesReceived += mPartialBinaryData.size();
  523. recv(make_message(std::move(mPartialBinaryData), Message::Binary, sid));
  524. mPartialBinaryData.clear();
  525. }
  526. break;
  527. case PPID_BINARY_EMPTY:
  528. recv(make_message(std::move(mPartialBinaryData), Message::Binary, sid));
  529. mPartialBinaryData.clear();
  530. break;
  531. default:
  532. // Unknown
  533. PLOG_WARNING << "Unknown PPID: " << uint32_t(ppid);
  534. return;
  535. }
  536. }
  537. void SctpTransport::processNotification(const union sctp_notification *notify, size_t len) {
  538. if (len != size_t(notify->sn_header.sn_length)) {
  539. PLOG_WARNING << "Invalid notification length";
  540. return;
  541. }
  542. auto type = notify->sn_header.sn_type;
  543. PLOG_VERBOSE << "Processing notification, type=" << type;
  544. switch (type) {
  545. case SCTP_ASSOC_CHANGE: {
  546. const struct sctp_assoc_change &assoc_change = notify->sn_assoc_change;
  547. if (assoc_change.sac_state == SCTP_COMM_UP) {
  548. PLOG_INFO << "SCTP connected";
  549. changeState(State::Connected);
  550. } else {
  551. if (state() == State::Connecting) {
  552. PLOG_ERROR << "SCTP connection failed";
  553. changeState(State::Failed);
  554. } else {
  555. PLOG_INFO << "SCTP disconnected";
  556. changeState(State::Disconnected);
  557. }
  558. mWrittenCondition.notify_all();
  559. }
  560. break;
  561. }
  562. case SCTP_SENDER_DRY_EVENT: {
  563. PLOG_VERBOSE << "SCTP dry event";
  564. // It should not be necessary since the send callback should have been called already,
  565. // but to be sure, let's try to send now.
  566. safeFlush();
  567. break;
  568. }
  569. case SCTP_STREAM_RESET_EVENT: {
  570. const struct sctp_stream_reset_event &reset_event = notify->sn_strreset_event;
  571. const int count = (reset_event.strreset_length - sizeof(reset_event)) / sizeof(uint16_t);
  572. const uint16_t flags = reset_event.strreset_flags;
  573. IF_PLOG(plog::verbose) {
  574. std::ostringstream desc;
  575. desc << "flags=";
  576. if (flags & SCTP_STREAM_RESET_OUTGOING_SSN && flags & SCTP_STREAM_RESET_INCOMING_SSN)
  577. desc << "outgoing|incoming";
  578. else if (flags & SCTP_STREAM_RESET_OUTGOING_SSN)
  579. desc << "outgoing";
  580. else if (flags & SCTP_STREAM_RESET_INCOMING_SSN)
  581. desc << "incoming";
  582. else
  583. desc << "0";
  584. desc << ", streams=[";
  585. for (int i = 0; i < count; ++i) {
  586. uint16_t streamId = reset_event.strreset_stream_list[i];
  587. desc << (i != 0 ? "," : "") << streamId;
  588. }
  589. desc << "]";
  590. PLOG_VERBOSE << "SCTP reset event, " << desc.str();
  591. }
  592. if (flags & SCTP_STREAM_RESET_OUTGOING_SSN) {
  593. for (int i = 0; i < count; ++i) {
  594. uint16_t streamId = reset_event.strreset_stream_list[i];
  595. closeStream(streamId);
  596. }
  597. }
  598. if (flags & SCTP_STREAM_RESET_INCOMING_SSN) {
  599. const byte dataChannelCloseMessage{0x04};
  600. for (int i = 0; i < count; ++i) {
  601. uint16_t streamId = reset_event.strreset_stream_list[i];
  602. recv(make_message(&dataChannelCloseMessage, &dataChannelCloseMessage + 1,
  603. Message::Control, streamId));
  604. }
  605. }
  606. break;
  607. }
  608. default:
  609. // Ignore
  610. break;
  611. }
  612. }
  613. void SctpTransport::clearStats() {
  614. mBytesReceived = 0;
  615. mBytesSent = 0;
  616. }
  617. size_t SctpTransport::bytesSent() { return mBytesSent; }
  618. size_t SctpTransport::bytesReceived() { return mBytesReceived; }
  619. std::optional<milliseconds> SctpTransport::rtt() {
  620. if (!mSock || state() != State::Connected)
  621. return nullopt;
  622. struct sctp_status status = {};
  623. socklen_t len = sizeof(status);
  624. if (usrsctp_getsockopt(mSock, IPPROTO_SCTP, SCTP_STATUS, &status, &len)) {
  625. PLOG_WARNING << "Could not read SCTP_STATUS";
  626. return nullopt;
  627. }
  628. return milliseconds(status.sstat_primary.spinfo_srtt);
  629. }
  630. void SctpTransport::UpcallCallback(struct socket *, void *arg, int /* flags */) {
  631. auto *transport = static_cast<SctpTransport *>(arg);
  632. std::shared_lock lock(InstancesMutex);
  633. if (Instances.find(transport) == Instances.end())
  634. return;
  635. transport->handleUpcall();
  636. }
  637. int SctpTransport::WriteCallback(void *ptr, void *data, size_t len, uint8_t tos, uint8_t set_df) {
  638. auto *transport = static_cast<SctpTransport *>(ptr);
  639. // Workaround for sctplab/usrsctp#405: Send callback is invoked on already closed socket
  640. // https://github.com/sctplab/usrsctp/issues/405
  641. std::shared_lock lock(InstancesMutex);
  642. if (Instances.find(transport) == Instances.end())
  643. return -1;
  644. return transport->handleWrite(static_cast<byte *>(data), len, tos, set_df);
  645. }
  646. } // namespace rtc