Browse Source

Refactored SCTP shutdown

Paul-Louis Ageneau 2 years ago
parent
commit
55f67c2a6d
3 changed files with 15 additions and 24 deletions
  1. 0 2
      src/impl/peerconnection.cpp
  2. 14 22
      src/impl/sctptransport.cpp
  3. 1 0
      src/impl/sctptransport.hpp

+ 0 - 2
src/impl/peerconnection.cpp

@@ -318,12 +318,10 @@ shared_ptr<SctpTransport> PeerConnection::initSctpTransport() {
 				    mProcessor.enqueue(&PeerConnection::openDataChannels, shared_from_this());
 				    break;
 			    case SctpTransport::State::Failed:
-				    LOG_WARNING << "SCTP transport failed";
 				    changeState(State::Failed);
 				    mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
 				    break;
 			    case SctpTransport::State::Disconnected:
-				    LOG_INFO << "SCTP transport disconnected";
 				    changeState(State::Disconnected);
 				    mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
 				    break;

+ 14 - 22
src/impl/sctptransport.cpp

@@ -175,7 +175,7 @@ void SctpTransport::SetSettings(const SctpSettings &s) {
 }
 
 void SctpTransport::Cleanup() {
-	while (usrsctp_finish() != 0)
+	while (usrsctp_finish())
 		std::this_thread::sleep_for(100ms);
 }
 
@@ -329,6 +329,8 @@ SctpTransport::SctpTransport(shared_ptr<Transport> lower, const Configuration &c
 SctpTransport::~SctpTransport() {
 	PLOG_DEBUG << "Destroying SCTP transport";
 
+	mProcessor.join(); // if we are here, the processor must be empty
+
 	// Before unregistering incoming() from the lower layer, we need to make sure the thread from
 	// lower layers is not blocked in incoming() by the WrittenOnce condition.
 	mWrittenOnce = true;
@@ -336,7 +338,6 @@ SctpTransport::~SctpTransport() {
 
 	unregisterIncoming();
 
-	mProcessor.join();
 	usrsctp_close(mSock);
 
 	usrsctp_deregister_address(this);
@@ -366,9 +367,6 @@ struct sockaddr_conn SctpTransport::getSockAddrConn(uint16_t port) {
 }
 
 void SctpTransport::connect() {
-	if (!mSock)
-		throw std::logic_error("Attempted SCTP connect with closed socket");
-
 	PLOG_DEBUG << "SCTP connecting (local port=" << mPorts.local
 	           << ", remote port=" << mPorts.remote << ")";
 	changeState(State::Connecting);
@@ -386,17 +384,6 @@ void SctpTransport::connect() {
 		throw std::runtime_error("Connection attempt failed, errno=" + std::to_string(errno));
 }
 
-void SctpTransport::shutdown() {
-	if (!mSock)
-		return;
-
-	PLOG_DEBUG << "SCTP shutdown";
-
-	if (usrsctp_shutdown(mSock, SHUT_RDWR) != 0 && errno != ENOTCONN) {
-		PLOG_WARNING << "SCTP shutdown failed, errno=" << errno;
-	}
-}
-
 bool SctpTransport::send(message_ptr message) {
 	std::lock_guard lock(mSendMutex);
 
@@ -553,9 +540,17 @@ bool SctpTransport::trySendQueue() {
 		updateBufferedAmount(to_uint16(message->stream), -ptrdiff_t(message_size_func(message)));
 	}
 
-	if (!mSendQueue.running()) {
-		shutdown();
-		return false;
+	if (!mSendQueue.running() && !std::exchange(mSendShutdown, true)) {
+		PLOG_DEBUG << "SCTP shutdown";
+		if (usrsctp_shutdown(mSock, SHUT_WR)) {
+			if (errno == ENOTCONN) {
+				PLOG_VERBOSE << "SCTP already shut down";
+			} else {
+				PLOG_WARNING << "SCTP shutdown failed, errno=" << errno;
+				changeState(State::Disconnected);
+				recv(nullptr);
+			}
+		}
 	}
 
 	return true;
@@ -700,9 +695,6 @@ void SctpTransport::sendReset(uint16_t streamId) {
 
 void SctpTransport::handleUpcall() {
 	try {
-		if (!mSock)
-			return;
-
 		PLOG_VERBOSE << "Handle upcall";
 
 		int events = usrsctp_get_events(mSock);

+ 1 - 0
src/impl/sctptransport.hpp

@@ -114,6 +114,7 @@ private:
 	std::mutex mRecvMutex;
 	std::recursive_mutex mSendMutex; // buffered amount callback is synchronous
 	Queue<message_ptr> mSendQueue;
+	bool mSendShutdown = false;
 	std::map<uint16_t, size_t> mBufferedAmount;
 	amount_callback mBufferedAmountCallback;