datachannel.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. /**
  2. * Copyright (c) 2019-2021 Paul-Louis Ageneau
  3. *
  4. * This Source Code Form is subject to the terms of the Mozilla Public
  5. * License, v. 2.0. If a copy of the MPL was not distributed with this
  6. * file, You can obtain one at https://mozilla.org/MPL/2.0/.
  7. */
  8. #include "datachannel.hpp"
  9. #include "common.hpp"
  10. #include "internals.hpp"
  11. #include "logcounter.hpp"
  12. #include "peerconnection.hpp"
  13. #include "sctptransport.hpp"
  14. #include "utils.hpp"
  15. #include "rtc/datachannel.hpp"
  16. #include "rtc/track.hpp"
  17. #include <algorithm>
  18. #ifdef _WIN32
  19. #include <winsock2.h>
  20. #else
  21. #include <arpa/inet.h>
  22. #endif
  23. using std::chrono::milliseconds;
  24. namespace rtc::impl {
  25. using utils::to_uint16;
  26. using utils::to_uint32;
  27. // Messages for the DataChannel establishment protocol (RFC 8832)
  28. // See https://www.rfc-editor.org/rfc/rfc8832.html
  29. enum MessageType : uint8_t {
  30. MESSAGE_OPEN_REQUEST = 0x00,
  31. MESSAGE_OPEN_RESPONSE = 0x01,
  32. MESSAGE_ACK = 0x02,
  33. MESSAGE_OPEN = 0x03
  34. };
  35. enum ChannelType : uint8_t {
  36. CHANNEL_RELIABLE = 0x00,
  37. CHANNEL_PARTIAL_RELIABLE_REXMIT = 0x01,
  38. CHANNEL_PARTIAL_RELIABLE_TIMED = 0x02
  39. };
  40. #pragma pack(push, 1)
  41. struct OpenMessage {
  42. uint8_t type = MESSAGE_OPEN;
  43. uint8_t channelType;
  44. uint16_t priority;
  45. uint32_t reliabilityParameter;
  46. uint16_t labelLength;
  47. uint16_t protocolLength;
  48. // The following fields are:
  49. // uint8_t[labelLength] label
  50. // uint8_t[protocolLength] protocol
  51. };
  52. struct AckMessage {
  53. uint8_t type = MESSAGE_ACK;
  54. };
  55. #pragma pack(pop)
  56. bool DataChannel::IsOpenMessage(message_ptr message) {
  57. if (message->type != Message::Control)
  58. return false;
  59. auto raw = reinterpret_cast<const uint8_t *>(message->data());
  60. return !message->empty() && raw[0] == MESSAGE_OPEN;
  61. }
  62. DataChannel::DataChannel(weak_ptr<PeerConnection> pc, string label, string protocol,
  63. Reliability reliability)
  64. : mPeerConnection(pc), mLabel(std::move(label)), mProtocol(std::move(protocol)),
  65. mRecvQueue(RECV_QUEUE_LIMIT, message_size_func) {
  66. if(reliability.maxPacketLifeTime && reliability.maxRetransmits)
  67. throw std::invalid_argument("Both maxPacketLifeTime and maxRetransmits are set");
  68. mReliability = std::make_shared<Reliability>(std::move(reliability));
  69. }
  70. DataChannel::~DataChannel() {
  71. PLOG_VERBOSE << "Destroying DataChannel";
  72. try {
  73. close();
  74. } catch (const std::exception &e) {
  75. PLOG_ERROR << e.what();
  76. }
  77. }
  78. void DataChannel::close() {
  79. PLOG_VERBOSE << "Closing DataChannel";
  80. shared_ptr<SctpTransport> transport;
  81. {
  82. std::shared_lock lock(mMutex);
  83. transport = mSctpTransport.lock();
  84. }
  85. if (!mIsClosed.exchange(true)) {
  86. if (transport && mStream.has_value())
  87. transport->closeStream(mStream.value());
  88. triggerClosed();
  89. }
  90. resetCallbacks();
  91. }
  92. void DataChannel::remoteClose() { close(); }
  93. optional<message_variant> DataChannel::receive() {
  94. auto next = mRecvQueue.pop();
  95. return next ? std::make_optional(to_variant(std::move(**next))) : nullopt;
  96. }
  97. optional<message_variant> DataChannel::peek() {
  98. auto next = mRecvQueue.peek();
  99. return next ? std::make_optional(to_variant(**next)) : nullopt;
  100. }
  101. size_t DataChannel::availableAmount() const { return mRecvQueue.amount(); }
  102. optional<uint16_t> DataChannel::stream() const {
  103. std::shared_lock lock(mMutex);
  104. return mStream;
  105. }
  106. string DataChannel::label() const {
  107. std::shared_lock lock(mMutex);
  108. return mLabel;
  109. }
  110. string DataChannel::protocol() const {
  111. std::shared_lock lock(mMutex);
  112. return mProtocol;
  113. }
  114. Reliability DataChannel::reliability() const {
  115. std::shared_lock lock(mMutex);
  116. return *mReliability;
  117. }
  118. bool DataChannel::isOpen(void) const { return !mIsClosed && mIsOpen; }
  119. bool DataChannel::isClosed(void) const { return mIsClosed; }
  120. size_t DataChannel::maxMessageSize() const {
  121. auto pc = mPeerConnection.lock();
  122. return pc ? pc->remoteMaxMessageSize() : DEFAULT_MAX_MESSAGE_SIZE;
  123. }
  124. void DataChannel::assignStream(uint16_t stream) {
  125. std::unique_lock lock(mMutex);
  126. if (mStream.has_value())
  127. throw std::logic_error("DataChannel already has a stream assigned");
  128. mStream = stream;
  129. }
  130. void DataChannel::open(shared_ptr<SctpTransport> transport) {
  131. {
  132. std::unique_lock lock(mMutex);
  133. mSctpTransport = transport;
  134. }
  135. if (!mIsClosed && !mIsOpen.exchange(true))
  136. triggerOpen();
  137. }
  138. void DataChannel::processOpenMessage(message_ptr) {
  139. PLOG_WARNING << "Received an open message for a user-negotiated DataChannel, ignoring";
  140. }
  141. bool DataChannel::outgoing(message_ptr message) {
  142. shared_ptr<SctpTransport> transport;
  143. {
  144. std::shared_lock lock(mMutex);
  145. transport = mSctpTransport.lock();
  146. if (!transport || mIsClosed)
  147. throw std::runtime_error("DataChannel is closed");
  148. if (!mStream.has_value())
  149. throw std::logic_error("DataChannel has no stream assigned");
  150. if (message->size() > maxMessageSize())
  151. throw std::invalid_argument("Message size exceeds limit");
  152. // Before the ACK has been received on a DataChannel, all messages must be sent ordered
  153. message->reliability = mIsOpen ? mReliability : nullptr;
  154. message->stream = mStream.value();
  155. }
  156. return transport->send(message);
  157. }
  158. void DataChannel::incoming(message_ptr message) {
  159. if (!message || mIsClosed)
  160. return;
  161. switch (message->type) {
  162. case Message::Control: {
  163. if (message->size() == 0)
  164. break; // Ignore
  165. auto raw = reinterpret_cast<const uint8_t *>(message->data());
  166. switch (raw[0]) {
  167. case MESSAGE_OPEN:
  168. processOpenMessage(message);
  169. break;
  170. case MESSAGE_ACK:
  171. if (!mIsOpen.exchange(true)) {
  172. triggerOpen();
  173. }
  174. break;
  175. default:
  176. // Ignore
  177. break;
  178. }
  179. break;
  180. }
  181. case Message::Reset:
  182. remoteClose();
  183. break;
  184. case Message::String:
  185. case Message::Binary:
  186. mRecvQueue.push(message);
  187. triggerAvailable(mRecvQueue.size());
  188. break;
  189. default:
  190. // Ignore
  191. break;
  192. }
  193. }
  194. OutgoingDataChannel::OutgoingDataChannel(weak_ptr<PeerConnection> pc, string label, string protocol,
  195. Reliability reliability)
  196. : DataChannel(pc, std::move(label), std::move(protocol), std::move(reliability)) {}
  197. OutgoingDataChannel::~OutgoingDataChannel() {}
  198. void OutgoingDataChannel::open(shared_ptr<SctpTransport> transport) {
  199. std::unique_lock lock(mMutex);
  200. mSctpTransport = transport;
  201. if (!mStream.has_value())
  202. throw std::runtime_error("DataChannel has no stream assigned");
  203. uint8_t channelType;
  204. uint32_t reliabilityParameter;
  205. if (mReliability->maxPacketLifeTime) {
  206. channelType = CHANNEL_PARTIAL_RELIABLE_TIMED;
  207. reliabilityParameter = to_uint32(mReliability->maxPacketLifeTime->count());
  208. } else if (mReliability->maxRetransmits) {
  209. channelType = CHANNEL_PARTIAL_RELIABLE_REXMIT;
  210. reliabilityParameter = to_uint32(*mReliability->maxRetransmits);
  211. }
  212. // else {
  213. // channelType = CHANNEL_RELIABLE;
  214. // reliabilityParameter = 0;
  215. // }
  216. // Deprecated
  217. else
  218. switch (mReliability->typeDeprecated) {
  219. case Reliability::Type::Rexmit:
  220. channelType = CHANNEL_PARTIAL_RELIABLE_REXMIT;
  221. reliabilityParameter = to_uint32(std::max(std::get<int>(mReliability->rexmit), 0));
  222. break;
  223. case Reliability::Type::Timed:
  224. channelType = CHANNEL_PARTIAL_RELIABLE_TIMED;
  225. reliabilityParameter = to_uint32(std::get<milliseconds>(mReliability->rexmit).count());
  226. break;
  227. default:
  228. channelType = CHANNEL_RELIABLE;
  229. reliabilityParameter = 0;
  230. break;
  231. }
  232. if (mReliability->unordered)
  233. channelType |= 0x80;
  234. const size_t len = sizeof(OpenMessage) + mLabel.size() + mProtocol.size();
  235. binary buffer(len, byte(0));
  236. auto &open = *reinterpret_cast<OpenMessage *>(buffer.data());
  237. open.type = MESSAGE_OPEN;
  238. open.channelType = channelType;
  239. open.priority = htons(0);
  240. open.reliabilityParameter = htonl(reliabilityParameter);
  241. open.labelLength = htons(to_uint16(mLabel.size()));
  242. open.protocolLength = htons(to_uint16(mProtocol.size()));
  243. auto end = reinterpret_cast<char *>(buffer.data() + sizeof(OpenMessage));
  244. std::copy(mLabel.begin(), mLabel.end(), end);
  245. std::copy(mProtocol.begin(), mProtocol.end(), end + mLabel.size());
  246. lock.unlock();
  247. transport->send(make_message(buffer.begin(), buffer.end(), Message::Control, mStream.value()));
  248. }
  249. void OutgoingDataChannel::processOpenMessage(message_ptr) {
  250. PLOG_WARNING << "Received an open message for a locally-created DataChannel, ignoring";
  251. }
  252. IncomingDataChannel::IncomingDataChannel(weak_ptr<PeerConnection> pc,
  253. weak_ptr<SctpTransport> transport)
  254. : DataChannel(pc, "", "", {}) {
  255. mSctpTransport = transport;
  256. }
  257. IncomingDataChannel::~IncomingDataChannel() {}
  258. void IncomingDataChannel::open(shared_ptr<SctpTransport>) {
  259. // Ignore
  260. }
  261. void IncomingDataChannel::processOpenMessage(message_ptr message) {
  262. std::unique_lock lock(mMutex);
  263. auto transport = mSctpTransport.lock();
  264. if (!transport)
  265. throw std::logic_error("DataChannel has no transport");
  266. if (!mStream.has_value())
  267. throw std::logic_error("DataChannel has no stream assigned");
  268. if (message->size() < sizeof(OpenMessage))
  269. throw std::invalid_argument("DataChannel open message too small");
  270. OpenMessage open = *reinterpret_cast<const OpenMessage *>(message->data());
  271. open.priority = ntohs(open.priority);
  272. open.reliabilityParameter = ntohl(open.reliabilityParameter);
  273. open.labelLength = ntohs(open.labelLength);
  274. open.protocolLength = ntohs(open.protocolLength);
  275. if (message->size() < sizeof(OpenMessage) + size_t(open.labelLength + open.protocolLength))
  276. throw std::invalid_argument("DataChannel open message truncated");
  277. auto end = reinterpret_cast<const char *>(message->data() + sizeof(OpenMessage));
  278. mLabel.assign(end, open.labelLength);
  279. mProtocol.assign(end + open.labelLength, open.protocolLength);
  280. mReliability->unordered = (open.channelType & 0x80) != 0;
  281. mReliability->maxPacketLifeTime.reset();
  282. mReliability->maxRetransmits.reset();
  283. switch (open.channelType & 0x7F) {
  284. case CHANNEL_PARTIAL_RELIABLE_REXMIT:
  285. mReliability->maxRetransmits.emplace(open.reliabilityParameter);
  286. break;
  287. case CHANNEL_PARTIAL_RELIABLE_TIMED:
  288. mReliability->maxPacketLifeTime.emplace(milliseconds(open.reliabilityParameter));
  289. break;
  290. default:
  291. break;
  292. }
  293. // Deprecated
  294. switch (open.channelType & 0x7F) {
  295. case CHANNEL_PARTIAL_RELIABLE_REXMIT:
  296. mReliability->typeDeprecated = Reliability::Type::Rexmit;
  297. mReliability->rexmit = int(open.reliabilityParameter);
  298. break;
  299. case CHANNEL_PARTIAL_RELIABLE_TIMED:
  300. mReliability->typeDeprecated = Reliability::Type::Timed;
  301. mReliability->rexmit = milliseconds(open.reliabilityParameter);
  302. break;
  303. default:
  304. mReliability->typeDeprecated = Reliability::Type::Reliable;
  305. mReliability->rexmit = int(0);
  306. }
  307. lock.unlock();
  308. binary buffer(sizeof(AckMessage), byte(0));
  309. auto &ack = *reinterpret_cast<AckMessage *>(buffer.data());
  310. ack.type = MESSAGE_ACK;
  311. transport->send(make_message(buffer.begin(), buffer.end(), Message::Control, mStream.value()));
  312. if (!mIsOpen.exchange(true))
  313. triggerOpen();
  314. }
  315. } // namespace rtc::impl