Browse Source

Merge branch 'v0.18'

Paul-Louis Ageneau 2 years ago
parent
commit
3fd1b1a2ac
1 changed files with 31 additions and 9 deletions
  1. 31 9
      src/impl/sctptransport.cpp

+ 31 - 9
src/impl/sctptransport.cpp

@@ -371,6 +371,8 @@ void SctpTransport::connect() {
 
 bool SctpTransport::send(message_ptr message) {
 	std::lock_guard lock(mSendMutex);
+	if (state() != State::Connected)
+		return false;
 
 	if (!message)
 		return trySendQueue();
@@ -389,6 +391,9 @@ bool SctpTransport::send(message_ptr message) {
 bool SctpTransport::flush() {
 	try {
 		std::lock_guard lock(mSendMutex);
+		if (state() != State::Connected)
+			return false;
+
 		trySendQueue();
 		return true;
 
@@ -412,7 +417,20 @@ void SctpTransport::closeStream(unsigned int stream) {
 
 void SctpTransport::close() {
 	mSendQueue.stop();
-	mProcessor.enqueue(&SctpTransport::flush, shared_from_this());
+	if (state() == State::Connected) {
+		mProcessor.enqueue(&SctpTransport::flush, shared_from_this());
+	} else if (state() == State::Connecting) {
+		PLOG_DEBUG << "SCTP early shutdown";
+		if (usrsctp_shutdown(mSock, SHUT_RDWR)) {
+			if (errno == ENOTCONN) {
+				PLOG_VERBOSE << "SCTP already shut down";
+			} else {
+				PLOG_WARNING << "SCTP shutdown failed, errno=" << errno;
+			}
+		}
+		changeState(State::Failed);
+		mWrittenCondition.notify_all();
+	}
 }
 
 unsigned int SctpTransport::maxStream() const {
@@ -426,9 +444,12 @@ void SctpTransport::incoming(message_ptr message) {
 	// to be sent on our side (i.e. the local INIT) before proceeding.
 	if (!mWrittenOnce) { // test the atomic boolean is not set first to prevent a lock contention
 		std::unique_lock lock(mWriteMutex);
-		mWrittenCondition.wait(lock, [&]() { return mWrittenOnce.load(); });
+		mWrittenCondition.wait(lock, [&]() { return mWrittenOnce || state() == State::Failed; });
 	}
 
+	if(state() == State::Failed)
+		return;
+
 	if (!message) {
 		PLOG_INFO << "SCTP disconnected";
 		changeState(State::Disconnected);
@@ -565,7 +586,7 @@ bool SctpTransport::trySendQueue() {
 
 bool SctpTransport::trySendMessage(message_ptr message) {
 	// Requires mSendMutex to be locked
-	if (!mSock || state() != State::Connected)
+	if (state() != State::Connected)
 		return false;
 
 	uint32_t ppid;
@@ -675,7 +696,7 @@ void SctpTransport::triggerBufferedAmount(uint16_t streamId, size_t amount) {
 
 void SctpTransport::sendReset(uint16_t streamId) {
 	// Requires mSendMutex to be locked
-	if (!mSock || state() != State::Connected)
+	if (state() != State::Connected)
 		return;
 
 	PLOG_DEBUG << "SCTP resetting stream " << streamId;
@@ -823,12 +844,13 @@ void SctpTransport::processNotification(const union sctp_notification *notify, s
 			PLOG_INFO << "SCTP connected";
 			changeState(State::Connected);
 		} else {
-			if (state() == State::Connecting) {
-				PLOG_ERROR << "SCTP connection failed";
-				changeState(State::Failed);
-			} else {
+			if (state() == State::Connected) {
 				PLOG_INFO << "SCTP disconnected";
 				changeState(State::Disconnected);
+				recv(nullptr);
+			} else {
+				PLOG_ERROR << "SCTP connection failed";
+				changeState(State::Failed);
 			}
 			mWrittenCondition.notify_all();
 		}
@@ -900,7 +922,7 @@ size_t SctpTransport::bytesSent() { return mBytesSent; }
 size_t SctpTransport::bytesReceived() { return mBytesReceived; }
 
 optional<milliseconds> SctpTransport::rtt() {
-	if (!mSock || state() != State::Connected)
+	if (state() != State::Connected)
 		return nullopt;
 
 	struct sctp_status status = {};