Browse Source

Cleanup and destruction fixes

Paul-Louis Ageneau 5 years ago
parent
commit
58eea3fcf6
3 changed files with 33 additions and 30 deletions
  1. 3 3
      src/dtlstransport.cpp
  2. 20 15
      src/sctptransport.cpp
  3. 10 12
      src/transport.hpp

+ 3 - 3
src/dtlstransport.cpp

@@ -85,10 +85,10 @@ DtlsTransport::DtlsTransport(shared_ptr<IceTransport> lower, shared_ptr<Certific
 }
 }
 
 
 DtlsTransport::~DtlsTransport() {
 DtlsTransport::~DtlsTransport() {
-	onRecv(nullptr); // unset recv callback
-
 	mIncomingQueue.stop();
 	mIncomingQueue.stop();
-	mRecvThread.join();
+
+	if (mRecvThread.joinable())
+		mRecvThread.join();
 
 
 	gnutls_bye(mSession, GNUTLS_SHUT_RDWR);
 	gnutls_bye(mSession, GNUTLS_SHUT_RDWR);
 	gnutls_deinit(mSession);
 	gnutls_deinit(mSession);

+ 20 - 15
src/sctptransport.cpp

@@ -97,7 +97,7 @@ SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port,
 		throw std::runtime_error("Could not subscribe to event SCTP_STREAM_RESET_EVENT, errno=" +
 		throw std::runtime_error("Could not subscribe to event SCTP_STREAM_RESET_EVENT, errno=" +
 		                         std::to_string(errno));
 		                         std::to_string(errno));
 
 
-	// The sender SHOULD disable the Nagle algorithm (see [RFC1122]) to minimize the latency.
+	// The sender SHOULD disable the Nagle algorithm (see RFC1122) to minimize the latency.
 	// See https://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-13#section-6.6
 	// See https://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-13#section-6.6
 	int nodelay = 1;
 	int nodelay = 1;
 	if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_NODELAY, &nodelay, sizeof(nodelay)))
 	if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_NODELAY, &nodelay, sizeof(nodelay)))
@@ -144,10 +144,16 @@ SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port,
 
 
 SctpTransport::~SctpTransport() {
 SctpTransport::~SctpTransport() {
 	onRecv(nullptr); // unset recv callback
 	onRecv(nullptr); // unset recv callback
-	mStopping = true;
-	mConnectCondition.notify_all();
+
 	mSendQueue.stop();
 	mSendQueue.stop();
 
 
+	// Unblock incoming
+	if (!mConnectDataSent) {
+		std::unique_lock<std::mutex> lock(mConnectMutex);
+		mConnectDataSent = true;
+		mConnectCondition.notify_all();
+	}
+
 	if (mSock) {
 	if (mSock) {
 		usrsctp_shutdown(mSock, SHUT_RDWR);
 		usrsctp_shutdown(mSock, SHUT_RDWR);
 		usrsctp_close(mSock);
 		usrsctp_close(mSock);
@@ -182,7 +188,7 @@ void SctpTransport::connect() {
 SctpTransport::State SctpTransport::state() const { return mState; }
 SctpTransport::State SctpTransport::state() const { return mState; }
 
 
 bool SctpTransport::send(message_ptr message) {
 bool SctpTransport::send(message_ptr message) {
-	if (!message || mStopping)
+	if (!message)
 		return false;
 		return false;
 
 
 	updateBufferedAmount(message->stream, message->size());
 	updateBufferedAmount(message->stream, message->size());
@@ -214,11 +220,10 @@ void SctpTransport::incoming(message_ptr message) {
 	// to be sent on our side (i.e. the local INIT) before proceeding.
 	// to be sent on our side (i.e. the local INIT) before proceeding.
 	if (!mConnectDataSent) {
 	if (!mConnectDataSent) {
 		std::unique_lock<std::mutex> lock(mConnectMutex);
 		std::unique_lock<std::mutex> lock(mConnectMutex);
-		mConnectCondition.wait(lock, [this] { return mConnectDataSent || mStopping; });
+		mConnectCondition.wait(lock, [this]() -> bool { return mConnectDataSent; });
 	}
 	}
 
 
-	if (!mStopping)
-		usrsctp_conninput(this, message->data(), message->size(), 0);
+	usrsctp_conninput(this, message->data(), message->size(), 0);
 }
 }
 
 
 void SctpTransport::changeState(State state) {
 void SctpTransport::changeState(State state) {
@@ -430,9 +435,9 @@ void SctpTransport::processNotification(const union sctp_notification *notify, s
 
 
 	switch (notify->sn_header.sn_type) {
 	switch (notify->sn_header.sn_type) {
 	case SCTP_ASSOC_CHANGE: {
 	case SCTP_ASSOC_CHANGE: {
-		const struct sctp_assoc_change *assoc_change = &notify->sn_assoc_change;
+		const struct sctp_assoc_change &assoc_change = notify->sn_assoc_change;
 		std::unique_lock<std::mutex> lock(mConnectMutex);
 		std::unique_lock<std::mutex> lock(mConnectMutex);
-		if (assoc_change->sac_state == SCTP_COMM_UP) {
+		if (assoc_change.sac_state == SCTP_COMM_UP) {
 			changeState(State::Connected);
 			changeState(State::Connected);
 		} else {
 		} else {
 			if (mState == State::Connecting) {
 			if (mState == State::Connecting) {
@@ -449,20 +454,20 @@ void SctpTransport::processNotification(const union sctp_notification *notify, s
 		trySendAll();
 		trySendAll();
 	}
 	}
 	case SCTP_STREAM_RESET_EVENT: {
 	case SCTP_STREAM_RESET_EVENT: {
-		const struct sctp_stream_reset_event *reset_event = &notify->sn_strreset_event;
-		const int count = (reset_event->strreset_length - sizeof(*reset_event)) / sizeof(uint16_t);
+		const struct sctp_stream_reset_event &reset_event = notify->sn_strreset_event;
+		const int count = (reset_event.strreset_length - sizeof(reset_event)) / sizeof(uint16_t);
 
 
-		if (reset_event->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) {
+		if (reset_event.strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) {
 			for (int i = 0; i < count; ++i) {
 			for (int i = 0; i < count; ++i) {
-				uint16_t streamId = reset_event->strreset_stream_list[i];
+				uint16_t streamId = reset_event.strreset_stream_list[i];
 				reset(streamId);
 				reset(streamId);
 			}
 			}
 		}
 		}
 
 
-		if (reset_event->strreset_flags & SCTP_STREAM_RESET_OUTGOING_SSN) {
+		if (reset_event.strreset_flags & SCTP_STREAM_RESET_OUTGOING_SSN) {
 			const byte dataChannelCloseMessage{0x04};
 			const byte dataChannelCloseMessage{0x04};
 			for (int i = 0; i < count; ++i) {
 			for (int i = 0; i < count; ++i) {
-				uint16_t streamId = reset_event->strreset_stream_list[i];
+				uint16_t streamId = reset_event.strreset_stream_list[i];
 				recv(make_message(&dataChannelCloseMessage, &dataChannelCloseMessage + 1,
 				recv(make_message(&dataChannelCloseMessage, &dataChannelCloseMessage + 1,
 				                  Message::Control, streamId));
 				                  Message::Control, streamId));
 			}
 			}

+ 10 - 12
src/transport.hpp

@@ -31,27 +31,25 @@ using namespace std::placeholders;
 
 
 class Transport {
 class Transport {
 public:
 public:
-	Transport(std::shared_ptr<Transport> lower = nullptr) : mLower(lower) { init(); }
-	virtual ~Transport() {}
+	Transport(std::shared_ptr<Transport> lower = nullptr) : mLower(std::move(lower)) {
+		if (mLower)
+			mLower->onRecv(std::bind(&Transport::incoming, this, _1));
+	}
+	virtual ~Transport() {
+		if (mLower)
+			mLower->onRecv(nullptr);
+	}
 
 
 	virtual bool send(message_ptr message) = 0;
 	virtual bool send(message_ptr message) = 0;
 	void onRecv(message_callback callback) { mRecvCallback = std::move(callback); }
 	void onRecv(message_callback callback) { mRecvCallback = std::move(callback); }
 
 
 protected:
 protected:
-	void recv(message_ptr message) {
-		if (mRecvCallback)
-			mRecvCallback(message);
-	}
+	void recv(message_ptr message) { mRecvCallback(message); }
 
 
 	virtual void incoming(message_ptr message) = 0;
 	virtual void incoming(message_ptr message) = 0;
 	virtual void outgoing(message_ptr message) { getLower()->send(message); }
 	virtual void outgoing(message_ptr message) { getLower()->send(message); }
 
 
 private:
 private:
-	void init() {
-		if (mLower)
-			mLower->onRecv(std::bind(&Transport::incoming, this, _1));
-	}
-
 	std::shared_ptr<Transport> getLower() {
 	std::shared_ptr<Transport> getLower() {
 		if (mLower)
 		if (mLower)
 			return mLower;
 			return mLower;
@@ -60,7 +58,7 @@ private:
 	}
 	}
 
 
 	std::shared_ptr<Transport> mLower;
 	std::shared_ptr<Transport> mLower;
-	message_callback mRecvCallback;
+	synchronized_callback<message_ptr> mRecvCallback;
 };
 };
 
 
 } // namespace rtc
 } // namespace rtc