|
@@ -62,12 +62,13 @@ DataChannel::DataChannel(shared_ptr<PeerConnection> pc, unsigned int stream, str
|
|
|
: mPeerConnection(std::move(pc)), mStream(stream), mLabel(std::move(label)),
|
|
|
mProtocol(std::move(protocol)),
|
|
|
mReliability(std::make_shared<Reliability>(std::move(reliability))),
|
|
|
- mRecvQueue(RECV_QUEUE_SIZE) {}
|
|
|
+ mRecvQueue(RECV_QUEUE_SIZE, message_size_func) {}
|
|
|
|
|
|
DataChannel::DataChannel(shared_ptr<PeerConnection> pc, shared_ptr<SctpTransport> transport,
|
|
|
unsigned int stream)
|
|
|
: mPeerConnection(std::move(pc)), mSctpTransport(transport), mStream(stream),
|
|
|
- mReliability(std::make_shared<Reliability>()) {}
|
|
|
+ mReliability(std::make_shared<Reliability>()),
|
|
|
+ mRecvQueue(RECV_QUEUE_SIZE, message_size_func) {}
|
|
|
|
|
|
DataChannel::~DataChannel() { close(); }
|
|
|
|
|
@@ -80,29 +81,18 @@ void DataChannel::close() {
|
|
|
}
|
|
|
|
|
|
void DataChannel::send(const std::variant<binary, string> &data) {
|
|
|
- if (mIsClosed || !mSctpTransport)
|
|
|
- return;
|
|
|
-
|
|
|
std::visit(
|
|
|
- [this](const auto &d) {
|
|
|
+ [&](const auto &d) {
|
|
|
using T = std::decay_t<decltype(d)>;
|
|
|
constexpr auto type = std::is_same_v<T, string> ? Message::String : Message::Binary;
|
|
|
auto *b = reinterpret_cast<const byte *>(d.data());
|
|
|
- // Before the ACK has been received on a DataChannel, all messages must be sent ordered
|
|
|
- auto reliability = mIsOpen ? mReliability : nullptr;
|
|
|
- auto message = make_message(b, b + d.size(), type, mStream, reliability);
|
|
|
- mSctpTransport->send(message);
|
|
|
+ outgoing(std::make_shared<Message>(b, b + d.size(), type));
|
|
|
},
|
|
|
data);
|
|
|
}
|
|
|
|
|
|
void DataChannel::send(const byte *data, size_t size) {
|
|
|
- if (mIsClosed || !mSctpTransport)
|
|
|
- return;
|
|
|
-
|
|
|
- auto reliability = mIsOpen ? mReliability : nullptr;
|
|
|
- auto message = make_message(data, data + size, Message::Binary, mStream, reliability);
|
|
|
- mSctpTransport->send(message);
|
|
|
+ outgoing(std::make_shared<Message>(data, data + size, Message::Binary));
|
|
|
}
|
|
|
|
|
|
std::optional<std::variant<binary, string>> DataChannel::receive() {
|
|
@@ -120,11 +110,9 @@ std::optional<std::variant<binary, string>> DataChannel::receive() {
|
|
|
break;
|
|
|
}
|
|
|
case Message::String:
|
|
|
- mRecvSize -= message->size();
|
|
|
return std::make_optional(
|
|
|
string(reinterpret_cast<const char *>(message->data()), message->size()));
|
|
|
case Message::Binary:
|
|
|
- mRecvSize -= message->size();
|
|
|
return std::make_optional(std::move(*message));
|
|
|
}
|
|
|
}
|
|
@@ -132,9 +120,11 @@ std::optional<std::variant<binary, string>> DataChannel::receive() {
|
|
|
return nullopt;
|
|
|
}
|
|
|
|
|
|
-size_t DataChannel::available() const { return mRecvQueue.size(); }
|
|
|
+bool DataChannel::isOpen(void) const { return mIsOpen; }
|
|
|
+
|
|
|
+bool DataChannel::isClosed(void) const { return mIsClosed; }
|
|
|
|
|
|
-size_t DataChannel::availableSize() const { return mRecvSize; }
|
|
|
+size_t DataChannel::availableAmount() const { return mRecvQueue.amount(); }
|
|
|
|
|
|
unsigned int DataChannel::stream() const { return mStream; }
|
|
|
|
|
@@ -144,10 +134,6 @@ string DataChannel::protocol() const { return mProtocol; }
|
|
|
|
|
|
Reliability DataChannel::reliability() const { return *mReliability; }
|
|
|
|
|
|
-bool DataChannel::isOpen(void) const { return mIsOpen; }
|
|
|
-
|
|
|
-bool DataChannel::isClosed(void) const { return mIsClosed; }
|
|
|
-
|
|
|
void DataChannel::open(shared_ptr<SctpTransport> sctpTransport) {
|
|
|
mSctpTransport = sctpTransport;
|
|
|
|
|
@@ -179,6 +165,15 @@ void DataChannel::open(shared_ptr<SctpTransport> sctpTransport) {
|
|
|
mSctpTransport->send(make_message(buffer.begin(), buffer.end(), Message::Control, mStream));
|
|
|
}
|
|
|
|
|
|
+void DataChannel::outgoing(mutable_message_ptr message) {
|
|
|
+ if (mIsClosed || !mSctpTransport)
|
|
|
+ return;
|
|
|
+ // Before the ACK has been received on a DataChannel, all messages must be sent ordered
|
|
|
+ message->reliability = mIsOpen ? mReliability : nullptr;
|
|
|
+ message->stream = mStream;
|
|
|
+ mSctpTransport->send(message);
|
|
|
+}
|
|
|
+
|
|
|
void DataChannel::incoming(message_ptr message) {
|
|
|
switch (message->type) {
|
|
|
case Message::Control: {
|
|
@@ -205,7 +200,6 @@ void DataChannel::incoming(message_ptr message) {
|
|
|
}
|
|
|
case Message::String:
|
|
|
case Message::Binary:
|
|
|
- mRecvSize += message->size();
|
|
|
mRecvQueue.push(message);
|
|
|
triggerAvailable(mRecvQueue.size());
|
|
|
break;
|