Browse Source

Implemented async sending in SCTP transport

Paul-Louis Ageneau 5 years ago
parent
commit
84219d381d
2 changed files with 85 additions and 62 deletions
  1. 82 62
      src/sctptransport.cpp
  2. 3 0
      src/sctptransport.hpp

+ 82 - 62
src/sctptransport.cpp

@@ -51,7 +51,7 @@ SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port, me
                              state_callback stateChangeCallback)
     : Transport(lower), mPort(port), mState(State::Disconnected),
       mStateChangeCallback(std::move(stateChangeCallback)) {
-  
+
   onRecv(recv);
 
 	GlobalInit();
@@ -120,9 +120,11 @@ SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port, me
 }
 
 SctpTransport::~SctpTransport() {
-  onRecv(nullptr);
+	onRecv(nullptr);
 	mStopping = true;
 	mConnectCondition.notify_all();
+	mSendQueue.stop();
+
 	if (mConnectThread.joinable())
 		mConnectThread.join();
 
@@ -138,61 +140,11 @@ SctpTransport::~SctpTransport() {
 SctpTransport::State SctpTransport::state() const { return mState; }
 
 bool SctpTransport::send(message_ptr message) {
-	if (!message)
+	if (!message || mStopping)
 		return false;
 
-	const Reliability reliability = message->reliability ? *message->reliability : Reliability();
-
-	struct sctp_sendv_spa spa = {};
-
-	uint32_t ppid;
-	switch (message->type) {
-	case Message::String:
-		ppid = !message->empty() ? PPID_STRING : PPID_STRING_EMPTY;
-		break;
-	case Message::Binary:
-		ppid = !message->empty() ? PPID_BINARY : PPID_BINARY_EMPTY;
-		break;
-	default:
-		ppid = PPID_CONTROL;
-		break;
-	}
-
-	// set sndinfo
-	spa.sendv_flags |= SCTP_SEND_SNDINFO_VALID;
-	spa.sendv_sndinfo.snd_sid = uint16_t(message->stream);
-	spa.sendv_sndinfo.snd_ppid = htonl(ppid);
-	spa.sendv_sndinfo.snd_flags |= SCTP_EOR;
-
-	// set prinfo
-	spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
-	if (reliability.unordered)
-		spa.sendv_sndinfo.snd_flags |= SCTP_UNORDERED;
-
-	using std::chrono::milliseconds;
-	switch (reliability.type) {
-	case Reliability::TYPE_PARTIAL_RELIABLE_REXMIT:
-		spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
-		spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_RTX;
-		spa.sendv_prinfo.pr_value = uint32_t(std::get<int>(reliability.rexmit));
-		break;
-	case Reliability::TYPE_PARTIAL_RELIABLE_TIMED:
-		spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
-		spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_TTL;
-		spa.sendv_prinfo.pr_value = uint32_t(std::get<milliseconds>(reliability.rexmit).count());
-		break;
-	default:
-		spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_NONE;
-		break;
-	}
-
-	if (!message->empty()) {
-		return usrsctp_sendv(mSock, message->data(), message->size(), nullptr, 0, &spa, sizeof(spa),
-		                     SCTP_SENDV_SPA, 0) > 0;
-	} else {
-		const char zero = 0;
-		return usrsctp_sendv(mSock, &zero, 1, nullptr, 0, &spa, sizeof(spa), SCTP_SENDV_SPA, 0) > 0;
-	}
+	mSendQueue.push(message);
+	return true;
 }
 
 void SctpTransport::reset(unsigned int stream) {
@@ -246,19 +198,87 @@ void SctpTransport::runConnect() {
 		// According to the IETF draft, both endpoints must initiate the SCTP association, in a
 		// simultaneous-open manner, irrelevent to the SDP setup role.
 		// See https://tools.ietf.org/html/draft-ietf-mmusic-sctp-sdp-26#section-9.3
-		if (usrsctp_connect(mSock, reinterpret_cast<struct sockaddr *>(&sconn), sizeof(sconn)) !=
-		    0) {
-			std::cerr << "SCTP connection failed, errno=" << errno << std::endl;
-			changeState(State::Failed);
-			mStopping = true;
-			return;
-		}
+		if (usrsctp_connect(mSock, reinterpret_cast<struct sockaddr *>(&sconn), sizeof(sconn)) != 0)
+			throw std::runtime_error("Connection failed, errno=" + std::to_string(errno));
 
 		if (!mStopping)
 			changeState(State::Connected);
 
 	} catch (const std::exception &e) {
 		std::cerr << "SCTP connect: " << e.what() << std::endl;
+		changeState(State::Failed);
+		mStopping = true;
+		mConnectCondition.notify_all();
+		return;
+	}
+
+	try {
+		while (auto message = mSendQueue.pop()) {
+			if (!doSend(*message))
+				throw std::runtime_error("Sending failed, errno=" + std::to_string(errno));
+		}
+	} catch (const std::exception &e) {
+		std::cerr << "SCTP send: " << e.what() << std::endl;
+		mStopping = true;
+		return;
+	}
+}
+
+bool SctpTransport::doSend(message_ptr message) {
+	if (!message)
+		return false;
+
+	const Reliability reliability = message->reliability ? *message->reliability : Reliability();
+
+	struct sctp_sendv_spa spa = {};
+
+	uint32_t ppid;
+	switch (message->type) {
+	case Message::String:
+		ppid = !message->empty() ? PPID_STRING : PPID_STRING_EMPTY;
+		break;
+	case Message::Binary:
+		ppid = !message->empty() ? PPID_BINARY : PPID_BINARY_EMPTY;
+		break;
+	default:
+		ppid = PPID_CONTROL;
+		break;
+	}
+
+	// set sndinfo
+	spa.sendv_flags |= SCTP_SEND_SNDINFO_VALID;
+	spa.sendv_sndinfo.snd_sid = uint16_t(message->stream);
+	spa.sendv_sndinfo.snd_ppid = htonl(ppid);
+	spa.sendv_sndinfo.snd_flags |= SCTP_EOR;
+
+	// set prinfo
+	spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
+	if (reliability.unordered)
+		spa.sendv_sndinfo.snd_flags |= SCTP_UNORDERED;
+
+	using std::chrono::milliseconds;
+	switch (reliability.type) {
+	case Reliability::TYPE_PARTIAL_RELIABLE_REXMIT:
+		spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
+		spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_RTX;
+		spa.sendv_prinfo.pr_value = uint32_t(std::get<int>(reliability.rexmit));
+		break;
+	case Reliability::TYPE_PARTIAL_RELIABLE_TIMED:
+		spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
+		spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_TTL;
+		spa.sendv_prinfo.pr_value = uint32_t(std::get<milliseconds>(reliability.rexmit).count());
+		break;
+	default:
+		spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_NONE;
+		break;
+	}
+
+	if (!message->empty()) {
+		return usrsctp_sendv(mSock, message->data(), message->size(), nullptr, 0, &spa, sizeof(spa),
+		                     SCTP_SENDV_SPA, 0) > 0;
+	} else {
+		const char zero = 0;
+		return usrsctp_sendv(mSock, &zero, 1, nullptr, 0, &spa, sizeof(spa), SCTP_SENDV_SPA, 0) > 0;
 	}
 }
 

+ 3 - 0
src/sctptransport.hpp

@@ -21,6 +21,7 @@
 
 #include "include.hpp"
 #include "peerconnection.hpp"
+#include "queue.hpp"
 #include "transport.hpp"
 
 #include <condition_variable>
@@ -62,6 +63,7 @@ private:
 	void incoming(message_ptr message);
 	void changeState(State state);
 	void runConnect();
+	bool doSend(message_ptr message);
 
 	int handleWrite(void *data, size_t len, uint8_t tos, uint8_t set_df);
 
@@ -74,6 +76,7 @@ private:
 	struct socket *mSock;
 	uint16_t mPort;
 
+	Queue<message_ptr> mSendQueue;
 	std::thread mConnectThread;
 	std::mutex mConnectMutex;
 	std::condition_variable mConnectCondition;