| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674 |
- /**
- * Copyright (c) 2019 Paul-Louis Ageneau
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
- */
- #include "sctptransport.hpp"
- #include <chrono>
- #include <exception>
- #include <iostream>
- #include <thread>
- #include <vector>
- #ifdef USE_JUICE
- #ifndef __APPLE__
- // libjuice enables Linux path MTU discovery or sets the DF flag
- #define USE_PMTUD 1
- #else
- // Setting the DF flag is not available on Mac OS
- #define USE_PMTUD 0
- #endif
- #else
- #ifdef __linux__
- // Linux UDP does path MTU discovery by default (setting DF and returning EMSGSIZE)
- // It should be safe to enable discovery for SCTP.
- #define USE_PMTUD 1
- #else
- // Otherwise assume fragmentation
- #define USE_PMTUD 0
- #endif
- #endif
- using namespace std::chrono_literals;
- using namespace std::chrono;
- using std::shared_ptr;
- namespace rtc {
- void SctpTransport::Init() {
- usrsctp_init(0, &SctpTransport::WriteCallback, nullptr);
- usrsctp_sysctl_set_sctp_ecn_enable(0);
- usrsctp_sysctl_set_sctp_init_rtx_max_default(5);
- usrsctp_sysctl_set_sctp_path_rtx_max_default(5);
- usrsctp_sysctl_set_sctp_assoc_rtx_max_default(5); // single path
- usrsctp_sysctl_set_sctp_rto_min_default(1 * 1000); // ms
- usrsctp_sysctl_set_sctp_rto_max_default(10 * 1000); // ms
- usrsctp_sysctl_set_sctp_rto_initial_default(1 * 1000); // ms
- usrsctp_sysctl_set_sctp_init_rto_max_default(10 * 1000); // ms
- usrsctp_sysctl_set_sctp_heartbeat_interval_default(10 * 1000); // ms
- usrsctp_sysctl_set_sctp_max_chunks_on_queue(10 * 1024);
- // Change congestion control from the default TCP Reno (RFC 2581) to HighSpeed TCP (RFC 3649)
- usrsctp_sysctl_set_sctp_default_cc_module(SCTP_CC_HSTCP);
- // Increase the initial window size to 10 MTUs (RFC 6928)
- usrsctp_sysctl_set_sctp_initial_cwnd(10);
- }
- void SctpTransport::Cleanup() {
- while (usrsctp_finish() != 0)
- std::this_thread::sleep_for(100ms);
- }
- SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port,
- message_callback recvCallback, amount_callback bufferedAmountCallback,
- state_callback stateChangeCallback)
- : Transport(lower, std::move(stateChangeCallback)), mPort(port),
- mSendQueue(0, message_size_func), mBufferedAmountCallback(std::move(bufferedAmountCallback)) {
- onRecv(recvCallback);
- PLOG_DEBUG << "Initializing SCTP transport";
- usrsctp_register_address(this);
- mSock = usrsctp_socket(AF_CONN, SOCK_STREAM, IPPROTO_SCTP, &SctpTransport::RecvCallback,
- &SctpTransport::SendCallback, 0, this);
- if (!mSock)
- throw std::runtime_error("Could not create SCTP socket, errno=" + std::to_string(errno));
- if (usrsctp_set_non_blocking(mSock, 1))
- throw std::runtime_error("Unable to set non-blocking mode, errno=" + std::to_string(errno));
- // SCTP must stop sending after the lower layer is shut down, so disable linger
- struct linger sol = {};
- sol.l_onoff = 1;
- sol.l_linger = 0;
- if (usrsctp_setsockopt(mSock, SOL_SOCKET, SO_LINGER, &sol, sizeof(sol)))
- throw std::runtime_error("Could not set socket option SO_LINGER, errno=" +
- std::to_string(errno));
- struct sctp_assoc_value av = {};
- av.assoc_id = SCTP_ALL_ASSOC;
- av.assoc_value = 1;
- if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, &av, sizeof(av)))
- throw std::runtime_error("Could not set socket option SCTP_ENABLE_STREAM_RESET, errno=" +
- std::to_string(errno));
- struct sctp_event se = {};
- se.se_assoc_id = SCTP_ALL_ASSOC;
- se.se_on = 1;
- se.se_type = SCTP_ASSOC_CHANGE;
- if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_EVENT, &se, sizeof(se)))
- throw std::runtime_error("Could not subscribe to event SCTP_ASSOC_CHANGE, errno=" +
- std::to_string(errno));
- se.se_type = SCTP_SENDER_DRY_EVENT;
- if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_EVENT, &se, sizeof(se)))
- throw std::runtime_error("Could not subscribe to event SCTP_SENDER_DRY_EVENT, errno=" +
- std::to_string(errno));
- se.se_type = SCTP_STREAM_RESET_EVENT;
- if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_EVENT, &se, sizeof(se)))
- throw std::runtime_error("Could not subscribe to event SCTP_STREAM_RESET_EVENT, errno=" +
- std::to_string(errno));
- // The sender SHOULD disable the Nagle algorithm (see RFC1122) to minimize the latency.
- // See https://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-13#section-6.6
- int nodelay = 1;
- if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_NODELAY, &nodelay, sizeof(nodelay)))
- throw std::runtime_error("Could not set socket option SCTP_NODELAY, errno=" +
- std::to_string(errno));
- struct sctp_paddrparams spp = {};
- #if USE_PMTUD
- // Enabled SCTP path MTU discovery
- spp.spp_flags = SPP_PMTUD_ENABLE;
- #else
- // Fall back to a safe MTU value.
- spp.spp_flags = SPP_PMTUD_DISABLE;
- spp.spp_pathmtu = 1200; // Max safe value recommended by RFC 8261
- // See https://tools.ietf.org/html/rfc8261#section-5
- #endif
- if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS, &spp, sizeof(spp)))
- throw std::runtime_error("Could not set socket option SCTP_PEER_ADDR_PARAMS, errno=" +
- std::to_string(errno));
- // The IETF draft recommends the number of streams negotiated during SCTP association to be
- // 65535. See https://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-13#section-6.2
- struct sctp_initmsg sinit = {};
- sinit.sinit_num_ostreams = 65535;
- sinit.sinit_max_instreams = 65535;
- if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_INITMSG, &sinit, sizeof(sinit)))
- throw std::runtime_error("Could not set socket option SCTP_INITMSG, errno=" +
- std::to_string(errno));
- // Prevent fragmented interleave of messages (i.e. level 0), see RFC 6458 8.1.20.
- // Unless the user has set the fragmentation interleave level to 0, notifications
- // may also be interleaved with partially delivered messages.
- int level = 0;
- if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_FRAGMENT_INTERLEAVE, &level, sizeof(level)))
- throw std::runtime_error("Could not disable SCTP fragmented interleave, errno=" +
- std::to_string(errno));
- // The default send and receive window size of usrsctp is 256KiB, which is too small for
- // realistic RTTs, therefore we increase it to 1MiB for better performance.
- // See https://bugzilla.mozilla.org/show_bug.cgi?id=1051685
- int bufferSize = 1024 * 1024;
- if (usrsctp_setsockopt(mSock, SOL_SOCKET, SO_RCVBUF, &bufferSize, sizeof(bufferSize)))
- throw std::runtime_error("Could not set SCTP recv buffer size, errno=" +
- std::to_string(errno));
- if (usrsctp_setsockopt(mSock, SOL_SOCKET, SO_SNDBUF, &bufferSize, sizeof(bufferSize)))
- throw std::runtime_error("Could not set SCTP send buffer size, errno=" +
- std::to_string(errno));
- registerIncoming();
- connect();
- }
- SctpTransport::~SctpTransport() {
- stop();
- if (mSock)
- usrsctp_close(mSock);
- usrsctp_deregister_address(this);
- }
- bool SctpTransport::stop() {
- if (!Transport::stop())
- return false;
- mSendQueue.stop();
- safeFlush();
- shutdown();
- onRecv(nullptr);
- return true;
- }
- void SctpTransport::connect() {
- if (!mSock)
- return;
- PLOG_DEBUG << "SCTP connect";
- changeState(State::Connecting);
- struct sockaddr_conn sconn = {};
- sconn.sconn_family = AF_CONN;
- sconn.sconn_port = htons(mPort);
- sconn.sconn_addr = this;
- #ifdef HAVE_SCONN_LEN
- sconn.sconn_len = sizeof(sconn);
- #endif
- if (usrsctp_bind(mSock, reinterpret_cast<struct sockaddr *>(&sconn), sizeof(sconn)))
- throw std::runtime_error("Could not bind usrsctp socket, errno=" + std::to_string(errno));
- // According to the IETF draft, both endpoints must initiate the SCTP association, in a
- // simultaneous-open manner, irrelevent to the SDP setup role.
- // See https://tools.ietf.org/html/draft-ietf-mmusic-sctp-sdp-26#section-9.3
- int ret = usrsctp_connect(mSock, reinterpret_cast<struct sockaddr *>(&sconn), sizeof(sconn));
- if (ret && errno != EINPROGRESS)
- throw std::runtime_error("Connection attempt failed, errno=" + std::to_string(errno));
- }
- void SctpTransport::shutdown() {
- if (!mSock)
- return;
- PLOG_DEBUG << "SCTP shutdown";
- if (usrsctp_shutdown(mSock, SHUT_RDWR) != 0 && errno != ENOTCONN) {
- PLOG_WARNING << "SCTP shutdown failed, errno=" << errno;
- }
- // close() abort the connection when linger is disabled, call it now
- usrsctp_close(mSock);
- mSock = nullptr;
- PLOG_INFO << "SCTP disconnected";
- changeState(State::Disconnected);
- mWrittenCondition.notify_all();
- }
- bool SctpTransport::send(message_ptr message) {
- std::lock_guard lock(mSendMutex);
- if (!message)
- return mSendQueue.empty();
- PLOG_VERBOSE << "Send size=" << message->size();
- // If nothing is pending, try to send directly
- if (mSendQueue.empty() && trySendMessage(message))
- return true;
- mSendQueue.push(message);
- updateBufferedAmount(message->stream, message_size_func(message));
- return false;
- }
- void SctpTransport::close(unsigned int stream) {
- send(make_message(0, Message::Reset, uint16_t(stream)));
- }
- void SctpTransport::flush() {
- std::lock_guard lock(mSendMutex);
- trySendQueue();
- }
- void SctpTransport::incoming(message_ptr message) {
- // There could be a race condition here where we receive the remote INIT before the local one is
- // sent, which would result in the connection being aborted. Therefore, we need to wait for data
- // to be sent on our side (i.e. the local INIT) before proceeding.
- {
- std::unique_lock lock(mWriteMutex);
- mWrittenCondition.wait(lock, [&]() { return mWrittenOnce || state() != State::Connected; });
- }
- if (!message) {
- PLOG_INFO << "SCTP disconnected";
- changeState(State::Disconnected);
- recv(nullptr);
- return;
- }
- PLOG_VERBOSE << "Incoming size=" << message->size();
- usrsctp_conninput(this, message->data(), message->size(), 0);
- }
- bool SctpTransport::trySendQueue() {
- // Requires mSendMutex to be locked
- while (auto next = mSendQueue.peek()) {
- auto message = *next;
- if (!trySendMessage(message))
- return false;
- mSendQueue.pop();
- updateBufferedAmount(message->stream, -message_size_func(message));
- }
- return true;
- }
- bool SctpTransport::trySendMessage(message_ptr message) {
- // Requires mSendMutex to be locked
- if (!mSock || state() != State::Connected)
- return false;
- uint32_t ppid;
- switch (message->type) {
- case Message::String:
- ppid = !message->empty() ? PPID_STRING : PPID_STRING_EMPTY;
- break;
- case Message::Binary:
- ppid = !message->empty() ? PPID_BINARY : PPID_BINARY_EMPTY;
- break;
- case Message::Control:
- ppid = PPID_CONTROL;
- break;
- case Message::Reset:
- sendReset(message->stream);
- return true;
- default:
- // Ignore
- return true;
- }
- PLOG_VERBOSE << "SCTP try send size=" << message->size();
- // TODO: Implement SCTP ndata specification draft when supported everywhere
- // See https://tools.ietf.org/html/draft-ietf-tsvwg-sctp-ndata-08
- const Reliability reliability = message->reliability ? *message->reliability : Reliability();
- struct sctp_sendv_spa spa = {};
- // set sndinfo
- spa.sendv_flags |= SCTP_SEND_SNDINFO_VALID;
- spa.sendv_sndinfo.snd_sid = uint16_t(message->stream);
- spa.sendv_sndinfo.snd_ppid = htonl(ppid);
- spa.sendv_sndinfo.snd_flags |= SCTP_EOR; // implicit here
- // set prinfo
- spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
- if (reliability.unordered)
- spa.sendv_sndinfo.snd_flags |= SCTP_UNORDERED;
- switch (reliability.type) {
- case Reliability::TYPE_PARTIAL_RELIABLE_REXMIT:
- spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
- spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_RTX;
- spa.sendv_prinfo.pr_value = uint32_t(std::get<int>(reliability.rexmit));
- break;
- case Reliability::TYPE_PARTIAL_RELIABLE_TIMED:
- spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
- spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_TTL;
- spa.sendv_prinfo.pr_value = uint32_t(std::get<milliseconds>(reliability.rexmit).count());
- break;
- default:
- spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_NONE;
- break;
- }
- ssize_t ret;
- if (!message->empty()) {
- ret = usrsctp_sendv(mSock, message->data(), message->size(), nullptr, 0, &spa, sizeof(spa),
- SCTP_SENDV_SPA, 0);
- } else {
- const char zero = 0;
- ret = usrsctp_sendv(mSock, &zero, 1, nullptr, 0, &spa, sizeof(spa), SCTP_SENDV_SPA, 0);
- }
- if (ret >= 0) {
- PLOG_VERBOSE << "SCTP sent size=" << message->size();
- if (message->type == Message::Type::Binary || message->type == Message::Type::String)
- mBytesSent += message->size();
- return true;
- } else if (errno == EWOULDBLOCK || errno == EAGAIN) {
- PLOG_VERBOSE << "SCTP sending not possible";
- return false;
- } else {
- PLOG_ERROR << "SCTP sending failed, errno=" << errno;
- throw std::runtime_error("Sending failed, errno=" + std::to_string(errno));
- }
- }
- void SctpTransport::updateBufferedAmount(uint16_t streamId, long delta) {
- // Requires mSendMutex to be locked
- auto it = mBufferedAmount.insert(std::make_pair(streamId, 0)).first;
- size_t amount = size_t(std::max(long(it->second) + delta, long(0)));
- if (amount == 0)
- mBufferedAmount.erase(it);
- else
- it->second = amount;
- mSendMutex.unlock();
- try {
- mBufferedAmountCallback(streamId, amount);
- } catch (const std::exception &e) {
- PLOG_WARNING << "SCTP buffered amount callback: " << e.what();
- }
- mSendMutex.lock();
- }
- void SctpTransport::sendReset(uint16_t streamId) {
- // Requires mSendMutex to be locked
- if (!mSock || state() != State::Connected)
- return;
- PLOG_DEBUG << "SCTP resetting stream " << streamId;
- using srs_t = struct sctp_reset_streams;
- const size_t len = sizeof(srs_t) + sizeof(uint16_t);
- byte buffer[len] = {};
- srs_t &srs = *reinterpret_cast<srs_t *>(buffer);
- srs.srs_flags = SCTP_STREAM_RESET_OUTGOING;
- srs.srs_number_streams = 1;
- srs.srs_stream_list[0] = streamId;
- mWritten = false;
- if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_RESET_STREAMS, &srs, len) == 0) {
- std::unique_lock lock(mWriteMutex); // locking before setsockopt might deadlock usrsctp...
- mWrittenCondition.wait_for(lock, 1000ms,
- [&]() { return mWritten || state() != State::Connected; });
- } else if (errno == EINVAL) {
- PLOG_VERBOSE << "SCTP stream " << streamId << " already reset";
- } else {
- PLOG_WARNING << "SCTP reset stream " << streamId << " failed, errno=" << errno;
- }
- }
- bool SctpTransport::safeFlush() {
- try {
- flush();
- return true;
- } catch (const std::exception &e) {
- PLOG_WARNING << "SCTP flush: " << e.what();
- return false;
- }
- }
- int SctpTransport::handleRecv(struct socket *sock, union sctp_sockstore addr, const byte *data,
- size_t len, struct sctp_rcvinfo info, int flags) {
- try {
- PLOG_VERBOSE << "Handle recv, len=" << len;
- if (!len)
- return -1;
- // This is valid because SCTP_FRAGMENT_INTERLEAVE is set to level 0
- // so partial messages and notifications may not be interleaved.
- if (flags & MSG_EOR) {
- if (!mPartialRecv.empty()) {
- mPartialRecv.insert(mPartialRecv.end(), data, data + len);
- data = mPartialRecv.data();
- len = mPartialRecv.size();
- }
- // Message/Notification is complete, process it
- if (flags & MSG_NOTIFICATION)
- processNotification(reinterpret_cast<const union sctp_notification *>(data), len);
- else
- processData(data, len, info.rcv_sid, PayloadId(htonl(info.rcv_ppid)));
- mPartialRecv.clear();
- } else {
- // Message/Notification is not complete
- mPartialRecv.insert(mPartialRecv.end(), data, data + len);
- }
- } catch (const std::exception &e) {
- PLOG_ERROR << "SCTP recv: " << e.what();
- return -1;
- }
- return 0; // success
- }
- int SctpTransport::handleSend(size_t free) {
- PLOG_VERBOSE << "Handle send, free=" << free;
- return safeFlush() ? 0 : -1;
- }
- int SctpTransport::handleWrite(byte *data, size_t len, uint8_t tos, uint8_t set_df) {
- try {
- PLOG_VERBOSE << "Handle write, len=" << len;
- std::unique_lock lock(mWriteMutex);
- if (!outgoing(make_message(data, data + len)))
- return -1;
- mWritten = true;
- mWrittenOnce = true;
- mWrittenCondition.notify_all();
- } catch (const std::exception &e) {
- PLOG_ERROR << "SCTP write: " << e.what();
- return -1;
- }
- return 0; // success
- }
- void SctpTransport::processData(const byte *data, size_t len, uint16_t sid, PayloadId ppid) {
- PLOG_VERBOSE << "Process data, len=" << len;
- // The usage of the PPIDs "WebRTC String Partial" and "WebRTC Binary Partial" is deprecated.
- // See https://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-13#section-6.6
- // We handle them at reception for compatibility reasons but should never send them.
- switch (ppid) {
- case PPID_CONTROL:
- recv(make_message(data, data + len, Message::Control, sid));
- break;
- case PPID_STRING_PARTIAL: // deprecated
- mPartialStringData.insert(mPartialStringData.end(), data, data + len);
- break;
- case PPID_STRING:
- if (mPartialStringData.empty()) {
- mBytesReceived += len;
- recv(make_message(data, data + len, Message::String, sid));
- } else {
- mPartialStringData.insert(mPartialStringData.end(), data, data + len);
- mBytesReceived += mPartialStringData.size();
- recv(make_message(mPartialStringData.begin(), mPartialStringData.end(), Message::String,
- sid));
- mPartialStringData.clear();
- }
- break;
- case PPID_STRING_EMPTY:
- // This only accounts for when the partial data is empty
- recv(make_message(mPartialStringData.begin(), mPartialStringData.end(), Message::String,
- sid));
- mPartialStringData.clear();
- break;
- case PPID_BINARY_PARTIAL: // deprecated
- mPartialBinaryData.insert(mPartialBinaryData.end(), data, data + len);
- break;
- case PPID_BINARY:
- if (mPartialBinaryData.empty()) {
- mBytesReceived += len;
- recv(make_message(data, data + len, Message::Binary, sid));
- } else {
- mPartialBinaryData.insert(mPartialBinaryData.end(), data, data + len);
- mBytesReceived += mPartialStringData.size();
- recv(make_message(mPartialBinaryData.begin(), mPartialBinaryData.end(), Message::Binary,
- sid));
- mPartialBinaryData.clear();
- }
- break;
- case PPID_BINARY_EMPTY:
- // This only accounts for when the partial data is empty
- recv(make_message(mPartialBinaryData.begin(), mPartialBinaryData.end(), Message::Binary,
- sid));
- mPartialBinaryData.clear();
- break;
- default:
- // Unknown
- PLOG_WARNING << "Unknown PPID: " << uint32_t(ppid);
- return;
- }
- }
- void SctpTransport::processNotification(const union sctp_notification *notify, size_t len) {
- if (len != size_t(notify->sn_header.sn_length)) {
- PLOG_WARNING << "Invalid notification length";
- return;
- }
- auto type = notify->sn_header.sn_type;
- PLOG_VERBOSE << "Process notification, type=" << type;
- switch (type) {
- case SCTP_ASSOC_CHANGE: {
- const struct sctp_assoc_change &assoc_change = notify->sn_assoc_change;
- if (assoc_change.sac_state == SCTP_COMM_UP) {
- PLOG_INFO << "SCTP connected";
- changeState(State::Connected);
- } else {
- if (state() == State::Connecting) {
- PLOG_ERROR << "SCTP connection failed";
- changeState(State::Failed);
- } else {
- PLOG_INFO << "SCTP disconnected";
- changeState(State::Disconnected);
- }
- mWrittenCondition.notify_all();
- }
- break;
- }
- case SCTP_SENDER_DRY_EVENT: {
- // It not should be necessary since the send callback should have been called already,
- // but to be sure, let's try to send now.
- safeFlush();
- break;
- }
- case SCTP_STREAM_RESET_EVENT: {
- const struct sctp_stream_reset_event &reset_event = notify->sn_strreset_event;
- const int count = (reset_event.strreset_length - sizeof(reset_event)) / sizeof(uint16_t);
- const uint16_t flags = reset_event.strreset_flags;
- if (flags & SCTP_STREAM_RESET_OUTGOING_SSN) {
- for (int i = 0; i < count; ++i) {
- uint16_t streamId = reset_event.strreset_stream_list[i];
- close(streamId);
- }
- }
- if (flags & SCTP_STREAM_RESET_INCOMING_SSN) {
- const byte dataChannelCloseMessage{0x04};
- for (int i = 0; i < count; ++i) {
- uint16_t streamId = reset_event.strreset_stream_list[i];
- recv(make_message(&dataChannelCloseMessage, &dataChannelCloseMessage + 1,
- Message::Control, streamId));
- }
- }
- break;
- }
- default:
- // Ignore
- break;
- }
- }
- void SctpTransport::clearStats() {
- mBytesReceived = 0;
- mBytesSent = 0;
- }
- size_t SctpTransport::bytesSent() { return mBytesSent; }
- size_t SctpTransport::bytesReceived() { return mBytesReceived; }
- std::optional<milliseconds> SctpTransport::rtt() {
- if (!mSock || state() != State::Connected)
- return nullopt;
- struct sctp_status status = {};
- socklen_t len = sizeof(status);
- if (usrsctp_getsockopt(mSock, IPPROTO_SCTP, SCTP_STATUS, &status, &len)) {
- PLOG_WARNING << "Could not read SCTP_STATUS";
- return nullopt;
- }
- return milliseconds(status.sstat_primary.spinfo_srtt);
- }
- int SctpTransport::RecvCallback(struct socket *sock, union sctp_sockstore addr, void *data,
- size_t len, struct sctp_rcvinfo recv_info, int flags, void *ptr) {
- int ret = static_cast<SctpTransport *>(ptr)->handleRecv(
- sock, addr, static_cast<const byte *>(data), len, recv_info, flags);
- free(data);
- return ret;
- }
- int SctpTransport::SendCallback(struct socket *sock, uint32_t sb_free) {
- struct sctp_paddrinfo paddrinfo = {};
- socklen_t len = sizeof(paddrinfo);
- if (usrsctp_getsockopt(sock, IPPROTO_SCTP, SCTP_GET_PEER_ADDR_INFO, &paddrinfo, &len))
- return -1;
- auto sconn = reinterpret_cast<struct sockaddr_conn *>(&paddrinfo.spinfo_address);
- void *ptr = sconn->sconn_addr;
- return static_cast<SctpTransport *>(ptr)->handleSend(size_t(sb_free));
- }
- int SctpTransport::WriteCallback(void *ptr, void *data, size_t len, uint8_t tos, uint8_t set_df) {
- return static_cast<SctpTransport *>(ptr)->handleWrite(static_cast<byte *>(data), len, tos,
- set_df);
- }
- } // namespace rtc
|