sctptransport.cpp 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674
  1. /**
  2. * Copyright (c) 2019 Paul-Louis Ageneau
  3. *
  4. * This library is free software; you can redistribute it and/or
  5. * modify it under the terms of the GNU Lesser General Public
  6. * License as published by the Free Software Foundation; either
  7. * version 2.1 of the License, or (at your option) any later version.
  8. *
  9. * This library is distributed in the hope that it will be useful,
  10. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  12. * Lesser General Public License for more details.
  13. *
  14. * You should have received a copy of the GNU Lesser General Public
  15. * License along with this library; if not, write to the Free Software
  16. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  17. */
  18. #include "sctptransport.hpp"
  19. #include <chrono>
  20. #include <exception>
  21. #include <iostream>
  22. #include <thread>
  23. #include <vector>
  24. #ifdef USE_JUICE
  25. #ifndef __APPLE__
  26. // libjuice enables Linux path MTU discovery or sets the DF flag
  27. #define USE_PMTUD 1
  28. #else
  29. // Setting the DF flag is not available on Mac OS
  30. #define USE_PMTUD 0
  31. #endif
  32. #else
  33. #ifdef __linux__
  34. // Linux UDP does path MTU discovery by default (setting DF and returning EMSGSIZE)
  35. // It should be safe to enable discovery for SCTP.
  36. #define USE_PMTUD 1
  37. #else
  38. // Otherwise assume fragmentation
  39. #define USE_PMTUD 0
  40. #endif
  41. #endif
  42. using namespace std::chrono_literals;
  43. using namespace std::chrono;
  44. using std::shared_ptr;
  45. namespace rtc {
  46. void SctpTransport::Init() {
  47. usrsctp_init(0, &SctpTransport::WriteCallback, nullptr);
  48. usrsctp_sysctl_set_sctp_ecn_enable(0);
  49. usrsctp_sysctl_set_sctp_init_rtx_max_default(5);
  50. usrsctp_sysctl_set_sctp_path_rtx_max_default(5);
  51. usrsctp_sysctl_set_sctp_assoc_rtx_max_default(5); // single path
  52. usrsctp_sysctl_set_sctp_rto_min_default(1 * 1000); // ms
  53. usrsctp_sysctl_set_sctp_rto_max_default(10 * 1000); // ms
  54. usrsctp_sysctl_set_sctp_rto_initial_default(1 * 1000); // ms
  55. usrsctp_sysctl_set_sctp_init_rto_max_default(10 * 1000); // ms
  56. usrsctp_sysctl_set_sctp_heartbeat_interval_default(10 * 1000); // ms
  57. usrsctp_sysctl_set_sctp_max_chunks_on_queue(10 * 1024);
  58. // Change congestion control from the default TCP Reno (RFC 2581) to HighSpeed TCP (RFC 3649)
  59. usrsctp_sysctl_set_sctp_default_cc_module(SCTP_CC_HSTCP);
  60. // Increase the initial window size to 10 MTUs (RFC 6928)
  61. usrsctp_sysctl_set_sctp_initial_cwnd(10);
  62. }
  63. void SctpTransport::Cleanup() {
  64. while (usrsctp_finish() != 0)
  65. std::this_thread::sleep_for(100ms);
  66. }
  67. SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port,
  68. message_callback recvCallback, amount_callback bufferedAmountCallback,
  69. state_callback stateChangeCallback)
  70. : Transport(lower, std::move(stateChangeCallback)), mPort(port),
  71. mSendQueue(0, message_size_func), mBufferedAmountCallback(std::move(bufferedAmountCallback)) {
  72. onRecv(recvCallback);
  73. PLOG_DEBUG << "Initializing SCTP transport";
  74. usrsctp_register_address(this);
  75. mSock = usrsctp_socket(AF_CONN, SOCK_STREAM, IPPROTO_SCTP, &SctpTransport::RecvCallback,
  76. &SctpTransport::SendCallback, 0, this);
  77. if (!mSock)
  78. throw std::runtime_error("Could not create SCTP socket, errno=" + std::to_string(errno));
  79. if (usrsctp_set_non_blocking(mSock, 1))
  80. throw std::runtime_error("Unable to set non-blocking mode, errno=" + std::to_string(errno));
  81. // SCTP must stop sending after the lower layer is shut down, so disable linger
  82. struct linger sol = {};
  83. sol.l_onoff = 1;
  84. sol.l_linger = 0;
  85. if (usrsctp_setsockopt(mSock, SOL_SOCKET, SO_LINGER, &sol, sizeof(sol)))
  86. throw std::runtime_error("Could not set socket option SO_LINGER, errno=" +
  87. std::to_string(errno));
  88. struct sctp_assoc_value av = {};
  89. av.assoc_id = SCTP_ALL_ASSOC;
  90. av.assoc_value = 1;
  91. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, &av, sizeof(av)))
  92. throw std::runtime_error("Could not set socket option SCTP_ENABLE_STREAM_RESET, errno=" +
  93. std::to_string(errno));
  94. struct sctp_event se = {};
  95. se.se_assoc_id = SCTP_ALL_ASSOC;
  96. se.se_on = 1;
  97. se.se_type = SCTP_ASSOC_CHANGE;
  98. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_EVENT, &se, sizeof(se)))
  99. throw std::runtime_error("Could not subscribe to event SCTP_ASSOC_CHANGE, errno=" +
  100. std::to_string(errno));
  101. se.se_type = SCTP_SENDER_DRY_EVENT;
  102. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_EVENT, &se, sizeof(se)))
  103. throw std::runtime_error("Could not subscribe to event SCTP_SENDER_DRY_EVENT, errno=" +
  104. std::to_string(errno));
  105. se.se_type = SCTP_STREAM_RESET_EVENT;
  106. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_EVENT, &se, sizeof(se)))
  107. throw std::runtime_error("Could not subscribe to event SCTP_STREAM_RESET_EVENT, errno=" +
  108. std::to_string(errno));
  109. // The sender SHOULD disable the Nagle algorithm (see RFC1122) to minimize the latency.
  110. // See https://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-13#section-6.6
  111. int nodelay = 1;
  112. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_NODELAY, &nodelay, sizeof(nodelay)))
  113. throw std::runtime_error("Could not set socket option SCTP_NODELAY, errno=" +
  114. std::to_string(errno));
  115. struct sctp_paddrparams spp = {};
  116. #if USE_PMTUD
  117. // Enabled SCTP path MTU discovery
  118. spp.spp_flags = SPP_PMTUD_ENABLE;
  119. #else
  120. // Fall back to a safe MTU value.
  121. spp.spp_flags = SPP_PMTUD_DISABLE;
  122. spp.spp_pathmtu = 1200; // Max safe value recommended by RFC 8261
  123. // See https://tools.ietf.org/html/rfc8261#section-5
  124. #endif
  125. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS, &spp, sizeof(spp)))
  126. throw std::runtime_error("Could not set socket option SCTP_PEER_ADDR_PARAMS, errno=" +
  127. std::to_string(errno));
  128. // The IETF draft recommends the number of streams negotiated during SCTP association to be
  129. // 65535. See https://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-13#section-6.2
  130. struct sctp_initmsg sinit = {};
  131. sinit.sinit_num_ostreams = 65535;
  132. sinit.sinit_max_instreams = 65535;
  133. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_INITMSG, &sinit, sizeof(sinit)))
  134. throw std::runtime_error("Could not set socket option SCTP_INITMSG, errno=" +
  135. std::to_string(errno));
  136. // Prevent fragmented interleave of messages (i.e. level 0), see RFC 6458 8.1.20.
  137. // Unless the user has set the fragmentation interleave level to 0, notifications
  138. // may also be interleaved with partially delivered messages.
  139. int level = 0;
  140. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_FRAGMENT_INTERLEAVE, &level, sizeof(level)))
  141. throw std::runtime_error("Could not disable SCTP fragmented interleave, errno=" +
  142. std::to_string(errno));
  143. // The default send and receive window size of usrsctp is 256KiB, which is too small for
  144. // realistic RTTs, therefore we increase it to 1MiB for better performance.
  145. // See https://bugzilla.mozilla.org/show_bug.cgi?id=1051685
  146. int bufferSize = 1024 * 1024;
  147. if (usrsctp_setsockopt(mSock, SOL_SOCKET, SO_RCVBUF, &bufferSize, sizeof(bufferSize)))
  148. throw std::runtime_error("Could not set SCTP recv buffer size, errno=" +
  149. std::to_string(errno));
  150. if (usrsctp_setsockopt(mSock, SOL_SOCKET, SO_SNDBUF, &bufferSize, sizeof(bufferSize)))
  151. throw std::runtime_error("Could not set SCTP send buffer size, errno=" +
  152. std::to_string(errno));
  153. registerIncoming();
  154. connect();
  155. }
  156. SctpTransport::~SctpTransport() {
  157. stop();
  158. if (mSock)
  159. usrsctp_close(mSock);
  160. usrsctp_deregister_address(this);
  161. }
  162. bool SctpTransport::stop() {
  163. if (!Transport::stop())
  164. return false;
  165. mSendQueue.stop();
  166. safeFlush();
  167. shutdown();
  168. onRecv(nullptr);
  169. return true;
  170. }
  171. void SctpTransport::connect() {
  172. if (!mSock)
  173. return;
  174. PLOG_DEBUG << "SCTP connect";
  175. changeState(State::Connecting);
  176. struct sockaddr_conn sconn = {};
  177. sconn.sconn_family = AF_CONN;
  178. sconn.sconn_port = htons(mPort);
  179. sconn.sconn_addr = this;
  180. #ifdef HAVE_SCONN_LEN
  181. sconn.sconn_len = sizeof(sconn);
  182. #endif
  183. if (usrsctp_bind(mSock, reinterpret_cast<struct sockaddr *>(&sconn), sizeof(sconn)))
  184. throw std::runtime_error("Could not bind usrsctp socket, errno=" + std::to_string(errno));
  185. // According to the IETF draft, both endpoints must initiate the SCTP association, in a
  186. // simultaneous-open manner, irrelevent to the SDP setup role.
  187. // See https://tools.ietf.org/html/draft-ietf-mmusic-sctp-sdp-26#section-9.3
  188. int ret = usrsctp_connect(mSock, reinterpret_cast<struct sockaddr *>(&sconn), sizeof(sconn));
  189. if (ret && errno != EINPROGRESS)
  190. throw std::runtime_error("Connection attempt failed, errno=" + std::to_string(errno));
  191. }
  192. void SctpTransport::shutdown() {
  193. if (!mSock)
  194. return;
  195. PLOG_DEBUG << "SCTP shutdown";
  196. if (usrsctp_shutdown(mSock, SHUT_RDWR) != 0 && errno != ENOTCONN) {
  197. PLOG_WARNING << "SCTP shutdown failed, errno=" << errno;
  198. }
  199. // close() abort the connection when linger is disabled, call it now
  200. usrsctp_close(mSock);
  201. mSock = nullptr;
  202. PLOG_INFO << "SCTP disconnected";
  203. changeState(State::Disconnected);
  204. mWrittenCondition.notify_all();
  205. }
  206. bool SctpTransport::send(message_ptr message) {
  207. std::lock_guard lock(mSendMutex);
  208. if (!message)
  209. return mSendQueue.empty();
  210. PLOG_VERBOSE << "Send size=" << message->size();
  211. // If nothing is pending, try to send directly
  212. if (mSendQueue.empty() && trySendMessage(message))
  213. return true;
  214. mSendQueue.push(message);
  215. updateBufferedAmount(message->stream, message_size_func(message));
  216. return false;
  217. }
  218. void SctpTransport::close(unsigned int stream) {
  219. send(make_message(0, Message::Reset, uint16_t(stream)));
  220. }
  221. void SctpTransport::flush() {
  222. std::lock_guard lock(mSendMutex);
  223. trySendQueue();
  224. }
  225. void SctpTransport::incoming(message_ptr message) {
  226. // There could be a race condition here where we receive the remote INIT before the local one is
  227. // sent, which would result in the connection being aborted. Therefore, we need to wait for data
  228. // to be sent on our side (i.e. the local INIT) before proceeding.
  229. {
  230. std::unique_lock lock(mWriteMutex);
  231. mWrittenCondition.wait(lock, [&]() { return mWrittenOnce || state() != State::Connected; });
  232. }
  233. if (!message) {
  234. PLOG_INFO << "SCTP disconnected";
  235. changeState(State::Disconnected);
  236. recv(nullptr);
  237. return;
  238. }
  239. PLOG_VERBOSE << "Incoming size=" << message->size();
  240. usrsctp_conninput(this, message->data(), message->size(), 0);
  241. }
  242. bool SctpTransport::trySendQueue() {
  243. // Requires mSendMutex to be locked
  244. while (auto next = mSendQueue.peek()) {
  245. auto message = *next;
  246. if (!trySendMessage(message))
  247. return false;
  248. mSendQueue.pop();
  249. updateBufferedAmount(message->stream, -message_size_func(message));
  250. }
  251. return true;
  252. }
  253. bool SctpTransport::trySendMessage(message_ptr message) {
  254. // Requires mSendMutex to be locked
  255. if (!mSock || state() != State::Connected)
  256. return false;
  257. uint32_t ppid;
  258. switch (message->type) {
  259. case Message::String:
  260. ppid = !message->empty() ? PPID_STRING : PPID_STRING_EMPTY;
  261. break;
  262. case Message::Binary:
  263. ppid = !message->empty() ? PPID_BINARY : PPID_BINARY_EMPTY;
  264. break;
  265. case Message::Control:
  266. ppid = PPID_CONTROL;
  267. break;
  268. case Message::Reset:
  269. sendReset(message->stream);
  270. return true;
  271. default:
  272. // Ignore
  273. return true;
  274. }
  275. PLOG_VERBOSE << "SCTP try send size=" << message->size();
  276. // TODO: Implement SCTP ndata specification draft when supported everywhere
  277. // See https://tools.ietf.org/html/draft-ietf-tsvwg-sctp-ndata-08
  278. const Reliability reliability = message->reliability ? *message->reliability : Reliability();
  279. struct sctp_sendv_spa spa = {};
  280. // set sndinfo
  281. spa.sendv_flags |= SCTP_SEND_SNDINFO_VALID;
  282. spa.sendv_sndinfo.snd_sid = uint16_t(message->stream);
  283. spa.sendv_sndinfo.snd_ppid = htonl(ppid);
  284. spa.sendv_sndinfo.snd_flags |= SCTP_EOR; // implicit here
  285. // set prinfo
  286. spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
  287. if (reliability.unordered)
  288. spa.sendv_sndinfo.snd_flags |= SCTP_UNORDERED;
  289. switch (reliability.type) {
  290. case Reliability::TYPE_PARTIAL_RELIABLE_REXMIT:
  291. spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
  292. spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_RTX;
  293. spa.sendv_prinfo.pr_value = uint32_t(std::get<int>(reliability.rexmit));
  294. break;
  295. case Reliability::TYPE_PARTIAL_RELIABLE_TIMED:
  296. spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
  297. spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_TTL;
  298. spa.sendv_prinfo.pr_value = uint32_t(std::get<milliseconds>(reliability.rexmit).count());
  299. break;
  300. default:
  301. spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_NONE;
  302. break;
  303. }
  304. ssize_t ret;
  305. if (!message->empty()) {
  306. ret = usrsctp_sendv(mSock, message->data(), message->size(), nullptr, 0, &spa, sizeof(spa),
  307. SCTP_SENDV_SPA, 0);
  308. } else {
  309. const char zero = 0;
  310. ret = usrsctp_sendv(mSock, &zero, 1, nullptr, 0, &spa, sizeof(spa), SCTP_SENDV_SPA, 0);
  311. }
  312. if (ret >= 0) {
  313. PLOG_VERBOSE << "SCTP sent size=" << message->size();
  314. if (message->type == Message::Type::Binary || message->type == Message::Type::String)
  315. mBytesSent += message->size();
  316. return true;
  317. } else if (errno == EWOULDBLOCK || errno == EAGAIN) {
  318. PLOG_VERBOSE << "SCTP sending not possible";
  319. return false;
  320. } else {
  321. PLOG_ERROR << "SCTP sending failed, errno=" << errno;
  322. throw std::runtime_error("Sending failed, errno=" + std::to_string(errno));
  323. }
  324. }
  325. void SctpTransport::updateBufferedAmount(uint16_t streamId, long delta) {
  326. // Requires mSendMutex to be locked
  327. auto it = mBufferedAmount.insert(std::make_pair(streamId, 0)).first;
  328. size_t amount = size_t(std::max(long(it->second) + delta, long(0)));
  329. if (amount == 0)
  330. mBufferedAmount.erase(it);
  331. else
  332. it->second = amount;
  333. mSendMutex.unlock();
  334. try {
  335. mBufferedAmountCallback(streamId, amount);
  336. } catch (const std::exception &e) {
  337. PLOG_WARNING << "SCTP buffered amount callback: " << e.what();
  338. }
  339. mSendMutex.lock();
  340. }
  341. void SctpTransport::sendReset(uint16_t streamId) {
  342. // Requires mSendMutex to be locked
  343. if (!mSock || state() != State::Connected)
  344. return;
  345. PLOG_DEBUG << "SCTP resetting stream " << streamId;
  346. using srs_t = struct sctp_reset_streams;
  347. const size_t len = sizeof(srs_t) + sizeof(uint16_t);
  348. byte buffer[len] = {};
  349. srs_t &srs = *reinterpret_cast<srs_t *>(buffer);
  350. srs.srs_flags = SCTP_STREAM_RESET_OUTGOING;
  351. srs.srs_number_streams = 1;
  352. srs.srs_stream_list[0] = streamId;
  353. mWritten = false;
  354. if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_RESET_STREAMS, &srs, len) == 0) {
  355. std::unique_lock lock(mWriteMutex); // locking before setsockopt might deadlock usrsctp...
  356. mWrittenCondition.wait_for(lock, 1000ms,
  357. [&]() { return mWritten || state() != State::Connected; });
  358. } else if (errno == EINVAL) {
  359. PLOG_VERBOSE << "SCTP stream " << streamId << " already reset";
  360. } else {
  361. PLOG_WARNING << "SCTP reset stream " << streamId << " failed, errno=" << errno;
  362. }
  363. }
  364. bool SctpTransport::safeFlush() {
  365. try {
  366. flush();
  367. return true;
  368. } catch (const std::exception &e) {
  369. PLOG_WARNING << "SCTP flush: " << e.what();
  370. return false;
  371. }
  372. }
  373. int SctpTransport::handleRecv(struct socket *sock, union sctp_sockstore addr, const byte *data,
  374. size_t len, struct sctp_rcvinfo info, int flags) {
  375. try {
  376. PLOG_VERBOSE << "Handle recv, len=" << len;
  377. if (!len)
  378. return -1;
  379. // This is valid because SCTP_FRAGMENT_INTERLEAVE is set to level 0
  380. // so partial messages and notifications may not be interleaved.
  381. if (flags & MSG_EOR) {
  382. if (!mPartialRecv.empty()) {
  383. mPartialRecv.insert(mPartialRecv.end(), data, data + len);
  384. data = mPartialRecv.data();
  385. len = mPartialRecv.size();
  386. }
  387. // Message/Notification is complete, process it
  388. if (flags & MSG_NOTIFICATION)
  389. processNotification(reinterpret_cast<const union sctp_notification *>(data), len);
  390. else
  391. processData(data, len, info.rcv_sid, PayloadId(htonl(info.rcv_ppid)));
  392. mPartialRecv.clear();
  393. } else {
  394. // Message/Notification is not complete
  395. mPartialRecv.insert(mPartialRecv.end(), data, data + len);
  396. }
  397. } catch (const std::exception &e) {
  398. PLOG_ERROR << "SCTP recv: " << e.what();
  399. return -1;
  400. }
  401. return 0; // success
  402. }
  403. int SctpTransport::handleSend(size_t free) {
  404. PLOG_VERBOSE << "Handle send, free=" << free;
  405. return safeFlush() ? 0 : -1;
  406. }
  407. int SctpTransport::handleWrite(byte *data, size_t len, uint8_t tos, uint8_t set_df) {
  408. try {
  409. PLOG_VERBOSE << "Handle write, len=" << len;
  410. std::unique_lock lock(mWriteMutex);
  411. if (!outgoing(make_message(data, data + len)))
  412. return -1;
  413. mWritten = true;
  414. mWrittenOnce = true;
  415. mWrittenCondition.notify_all();
  416. } catch (const std::exception &e) {
  417. PLOG_ERROR << "SCTP write: " << e.what();
  418. return -1;
  419. }
  420. return 0; // success
  421. }
  422. void SctpTransport::processData(const byte *data, size_t len, uint16_t sid, PayloadId ppid) {
  423. PLOG_VERBOSE << "Process data, len=" << len;
  424. // The usage of the PPIDs "WebRTC String Partial" and "WebRTC Binary Partial" is deprecated.
  425. // See https://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-13#section-6.6
  426. // We handle them at reception for compatibility reasons but should never send them.
  427. switch (ppid) {
  428. case PPID_CONTROL:
  429. recv(make_message(data, data + len, Message::Control, sid));
  430. break;
  431. case PPID_STRING_PARTIAL: // deprecated
  432. mPartialStringData.insert(mPartialStringData.end(), data, data + len);
  433. break;
  434. case PPID_STRING:
  435. if (mPartialStringData.empty()) {
  436. mBytesReceived += len;
  437. recv(make_message(data, data + len, Message::String, sid));
  438. } else {
  439. mPartialStringData.insert(mPartialStringData.end(), data, data + len);
  440. mBytesReceived += mPartialStringData.size();
  441. recv(make_message(mPartialStringData.begin(), mPartialStringData.end(), Message::String,
  442. sid));
  443. mPartialStringData.clear();
  444. }
  445. break;
  446. case PPID_STRING_EMPTY:
  447. // This only accounts for when the partial data is empty
  448. recv(make_message(mPartialStringData.begin(), mPartialStringData.end(), Message::String,
  449. sid));
  450. mPartialStringData.clear();
  451. break;
  452. case PPID_BINARY_PARTIAL: // deprecated
  453. mPartialBinaryData.insert(mPartialBinaryData.end(), data, data + len);
  454. break;
  455. case PPID_BINARY:
  456. if (mPartialBinaryData.empty()) {
  457. mBytesReceived += len;
  458. recv(make_message(data, data + len, Message::Binary, sid));
  459. } else {
  460. mPartialBinaryData.insert(mPartialBinaryData.end(), data, data + len);
  461. mBytesReceived += mPartialStringData.size();
  462. recv(make_message(mPartialBinaryData.begin(), mPartialBinaryData.end(), Message::Binary,
  463. sid));
  464. mPartialBinaryData.clear();
  465. }
  466. break;
  467. case PPID_BINARY_EMPTY:
  468. // This only accounts for when the partial data is empty
  469. recv(make_message(mPartialBinaryData.begin(), mPartialBinaryData.end(), Message::Binary,
  470. sid));
  471. mPartialBinaryData.clear();
  472. break;
  473. default:
  474. // Unknown
  475. PLOG_WARNING << "Unknown PPID: " << uint32_t(ppid);
  476. return;
  477. }
  478. }
  479. void SctpTransport::processNotification(const union sctp_notification *notify, size_t len) {
  480. if (len != size_t(notify->sn_header.sn_length)) {
  481. PLOG_WARNING << "Invalid notification length";
  482. return;
  483. }
  484. auto type = notify->sn_header.sn_type;
  485. PLOG_VERBOSE << "Process notification, type=" << type;
  486. switch (type) {
  487. case SCTP_ASSOC_CHANGE: {
  488. const struct sctp_assoc_change &assoc_change = notify->sn_assoc_change;
  489. if (assoc_change.sac_state == SCTP_COMM_UP) {
  490. PLOG_INFO << "SCTP connected";
  491. changeState(State::Connected);
  492. } else {
  493. if (state() == State::Connecting) {
  494. PLOG_ERROR << "SCTP connection failed";
  495. changeState(State::Failed);
  496. } else {
  497. PLOG_INFO << "SCTP disconnected";
  498. changeState(State::Disconnected);
  499. }
  500. mWrittenCondition.notify_all();
  501. }
  502. break;
  503. }
  504. case SCTP_SENDER_DRY_EVENT: {
  505. // It not should be necessary since the send callback should have been called already,
  506. // but to be sure, let's try to send now.
  507. safeFlush();
  508. break;
  509. }
  510. case SCTP_STREAM_RESET_EVENT: {
  511. const struct sctp_stream_reset_event &reset_event = notify->sn_strreset_event;
  512. const int count = (reset_event.strreset_length - sizeof(reset_event)) / sizeof(uint16_t);
  513. const uint16_t flags = reset_event.strreset_flags;
  514. if (flags & SCTP_STREAM_RESET_OUTGOING_SSN) {
  515. for (int i = 0; i < count; ++i) {
  516. uint16_t streamId = reset_event.strreset_stream_list[i];
  517. close(streamId);
  518. }
  519. }
  520. if (flags & SCTP_STREAM_RESET_INCOMING_SSN) {
  521. const byte dataChannelCloseMessage{0x04};
  522. for (int i = 0; i < count; ++i) {
  523. uint16_t streamId = reset_event.strreset_stream_list[i];
  524. recv(make_message(&dataChannelCloseMessage, &dataChannelCloseMessage + 1,
  525. Message::Control, streamId));
  526. }
  527. }
  528. break;
  529. }
  530. default:
  531. // Ignore
  532. break;
  533. }
  534. }
  535. void SctpTransport::clearStats() {
  536. mBytesReceived = 0;
  537. mBytesSent = 0;
  538. }
  539. size_t SctpTransport::bytesSent() { return mBytesSent; }
  540. size_t SctpTransport::bytesReceived() { return mBytesReceived; }
  541. std::optional<milliseconds> SctpTransport::rtt() {
  542. if (!mSock || state() != State::Connected)
  543. return nullopt;
  544. struct sctp_status status = {};
  545. socklen_t len = sizeof(status);
  546. if (usrsctp_getsockopt(mSock, IPPROTO_SCTP, SCTP_STATUS, &status, &len)) {
  547. PLOG_WARNING << "Could not read SCTP_STATUS";
  548. return nullopt;
  549. }
  550. return milliseconds(status.sstat_primary.spinfo_srtt);
  551. }
  552. int SctpTransport::RecvCallback(struct socket *sock, union sctp_sockstore addr, void *data,
  553. size_t len, struct sctp_rcvinfo recv_info, int flags, void *ptr) {
  554. int ret = static_cast<SctpTransport *>(ptr)->handleRecv(
  555. sock, addr, static_cast<const byte *>(data), len, recv_info, flags);
  556. free(data);
  557. return ret;
  558. }
  559. int SctpTransport::SendCallback(struct socket *sock, uint32_t sb_free) {
  560. struct sctp_paddrinfo paddrinfo = {};
  561. socklen_t len = sizeof(paddrinfo);
  562. if (usrsctp_getsockopt(sock, IPPROTO_SCTP, SCTP_GET_PEER_ADDR_INFO, &paddrinfo, &len))
  563. return -1;
  564. auto sconn = reinterpret_cast<struct sockaddr_conn *>(&paddrinfo.spinfo_address);
  565. void *ptr = sconn->sconn_addr;
  566. return static_cast<SctpTransport *>(ptr)->handleSend(size_t(sb_free));
  567. }
  568. int SctpTransport::WriteCallback(void *ptr, void *data, size_t len, uint8_t tos, uint8_t set_df) {
  569. return static_cast<SctpTransport *>(ptr)->handleWrite(static_cast<byte *>(data), len, tos,
  570. set_df);
  571. }
  572. } // namespace rtc