|
@@ -115,6 +115,19 @@ size_t PeerConnection::remoteMaxMessageSize() const {
|
|
|
return std::min(remoteMax, localMax);
|
|
|
}
|
|
|
|
|
|
+// Helper for PeerConnection::initXTransport methods: start and emplace the transport
|
|
|
+template <typename T>
|
|
|
+shared_ptr<T> emplaceTransport(PeerConnection *pc, shared_ptr<T> *member, shared_ptr<T> transport) {
|
|
|
+ transport->start();
|
|
|
+ std::atomic_store(member, transport);
|
|
|
+ if (pc->state.load() == PeerConnection::State::Closed) {
|
|
|
+ std::atomic_store(member, decltype(transport)(nullptr));
|
|
|
+ transport->stop();
|
|
|
+ return nullptr;
|
|
|
+ }
|
|
|
+ return transport;
|
|
|
+}
|
|
|
+
|
|
|
shared_ptr<IceTransport> PeerConnection::initIceTransport() {
|
|
|
try {
|
|
|
if (auto transport = std::atomic_load(&mIceTransport))
|
|
@@ -136,7 +149,7 @@ shared_ptr<IceTransport> PeerConnection::initIceTransport() {
|
|
|
changeState(State::Failed);
|
|
|
break;
|
|
|
case IceTransport::State::Connected:
|
|
|
- initDtlsTransport();
|
|
|
+ mProcessor->enqueue(&PeerConnection::initDtlsTransport, this);
|
|
|
break;
|
|
|
case IceTransport::State::Disconnected:
|
|
|
changeState(State::Disconnected);
|
|
@@ -164,13 +177,7 @@ shared_ptr<IceTransport> PeerConnection::initIceTransport() {
|
|
|
}
|
|
|
});
|
|
|
|
|
|
- std::atomic_store(&mIceTransport, transport);
|
|
|
- if (state.load() == State::Closed) {
|
|
|
- std::atomic_store(&mIceTransport, decltype(mIceTransport)(nullptr));
|
|
|
- throw std::runtime_error("Connection is closed");
|
|
|
- }
|
|
|
- transport->start();
|
|
|
- return transport;
|
|
|
+ return emplaceTransport(this, &mIceTransport, std::move(transport));
|
|
|
|
|
|
} catch (const std::exception &e) {
|
|
|
PLOG_ERROR << e.what();
|
|
@@ -201,7 +208,7 @@ shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
|
|
|
switch (transportState) {
|
|
|
case DtlsTransport::State::Connected:
|
|
|
if (auto remote = remoteDescription(); remote && remote->hasApplication())
|
|
|
- initSctpTransport();
|
|
|
+ mProcessor->enqueue(&PeerConnection::initSctpTransport, this);
|
|
|
else
|
|
|
changeState(State::Connected);
|
|
|
|
|
@@ -239,13 +246,7 @@ shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
|
|
|
verifierCallback, dtlsStateChangeCallback);
|
|
|
}
|
|
|
|
|
|
- std::atomic_store(&mDtlsTransport, transport);
|
|
|
- if (state.load() == State::Closed) {
|
|
|
- std::atomic_store(&mDtlsTransport, decltype(mDtlsTransport)(nullptr));
|
|
|
- throw std::runtime_error("Connection is closed");
|
|
|
- }
|
|
|
- transport->start();
|
|
|
- return transport;
|
|
|
+ return emplaceTransport(this, &mDtlsTransport, std::move(transport));
|
|
|
|
|
|
} catch (const std::exception &e) {
|
|
|
PLOG_ERROR << e.what();
|
|
@@ -301,13 +302,7 @@ shared_ptr<SctpTransport> PeerConnection::initSctpTransport() {
|
|
|
}
|
|
|
});
|
|
|
|
|
|
- std::atomic_store(&mSctpTransport, transport);
|
|
|
- if (state.load() == State::Closed) {
|
|
|
- std::atomic_store(&mSctpTransport, decltype(mSctpTransport)(nullptr));
|
|
|
- throw std::runtime_error("Connection is closed");
|
|
|
- }
|
|
|
- transport->start();
|
|
|
- return transport;
|
|
|
+ return emplaceTransport(this, &mSctpTransport, std::move(transport));
|
|
|
|
|
|
} catch (const std::exception &e) {
|
|
|
PLOG_ERROR << e.what();
|
|
@@ -344,18 +339,19 @@ void PeerConnection::closeTransports() {
|
|
|
auto sctp = std::atomic_exchange(&mSctpTransport, decltype(mSctpTransport)(nullptr));
|
|
|
auto dtls = std::atomic_exchange(&mDtlsTransport, decltype(mDtlsTransport)(nullptr));
|
|
|
auto ice = std::atomic_exchange(&mIceTransport, decltype(mIceTransport)(nullptr));
|
|
|
- ThreadPool::Instance().enqueue([sctp, dtls, ice]() mutable {
|
|
|
- if (sctp)
|
|
|
- sctp->stop();
|
|
|
- if (dtls)
|
|
|
- dtls->stop();
|
|
|
- if (ice)
|
|
|
- ice->stop();
|
|
|
-
|
|
|
- sctp.reset();
|
|
|
- dtls.reset();
|
|
|
- ice.reset();
|
|
|
- });
|
|
|
+ ThreadPool::Instance().enqueue(
|
|
|
+ [sctp = std::move(sctp), dtls = std::move(dtls), ice = std::move(ice)]() mutable {
|
|
|
+ if (sctp)
|
|
|
+ sctp->stop();
|
|
|
+ if (dtls)
|
|
|
+ dtls->stop();
|
|
|
+ if (ice)
|
|
|
+ ice->stop();
|
|
|
+
|
|
|
+ sctp.reset();
|
|
|
+ dtls.reset();
|
|
|
+ ice.reset();
|
|
|
+ });
|
|
|
});
|
|
|
}
|
|
|
|
|
@@ -937,6 +933,9 @@ void PeerConnection::processRemoteDescription(Description description) {
|
|
|
}
|
|
|
|
|
|
auto iceTransport = initIceTransport();
|
|
|
+ if (!iceTransport)
|
|
|
+ return; // closed
|
|
|
+
|
|
|
iceTransport->setRemoteDescription(std::move(description));
|
|
|
|
|
|
// Since we assumed passive role during DataChannel creation, we might need to shift the stream
|
|
@@ -1037,11 +1036,11 @@ void PeerConnection::triggerPendingTracks() {
|
|
|
}
|
|
|
|
|
|
void PeerConnection::flushPendingDataChannels() {
|
|
|
- mProcessor->enqueue(std::bind(&PeerConnection::triggerPendingDataChannels, this));
|
|
|
+ mProcessor->enqueue(&PeerConnection::triggerPendingDataChannels, this);
|
|
|
}
|
|
|
|
|
|
void PeerConnection::flushPendingTracks() {
|
|
|
- mProcessor->enqueue(std::bind(&PeerConnection::triggerPendingTracks, this));
|
|
|
+ mProcessor->enqueue(&PeerConnection::triggerPendingTracks, this);
|
|
|
}
|
|
|
|
|
|
bool PeerConnection::changeState(State newState) {
|