Browse Source

Merge pull request #5 from paullouisageneau/master

Sync with master
Erik Cota-Robles 4 years ago
parent
commit
a99efd27d2

+ 1 - 1
deps/libjuice

@@ -1 +1 @@
-Subproject commit ddc30648908181f6e6222a2fd648d35c2c28b724
+Subproject commit 45aafbc99fc2f95dc38c0abbfe588f2ba6711f6d

+ 1 - 1
deps/usrsctp

@@ -1 +1 @@
-Subproject commit ffed0925f27d404173c1e3e750d818f432d2c019
+Subproject commit 0db969100094422d9ea74a08ae5e5d9a4cfdb06b

+ 7 - 3
examples/signaling-server-python/signaling-server.py

@@ -63,16 +63,20 @@ async def handle_websocket(websocket, path):
             print('Client {} disconnected'.format(client_id))
             print('Client {} disconnected'.format(client_id))
 
 
 if __name__ == '__main__':
 if __name__ == '__main__':
-    port = int(sys.argv[1]) if len(sys.argv) > 1 else 8000
+    # Usage: ./server.py [[host:]port] [SSL certificate file]
+    endpoint_or_port = sys.argv[1] if len(sys.argv) > 1 else "8000"
     ssl_cert = sys.argv[2] if len(sys.argv) > 2 else None
     ssl_cert = sys.argv[2] if len(sys.argv) > 2 else None
 
 
+    endpoint = endpoint_or_port if ':' in endpoint_or_port else "127.0.0.1:" + endpoint_or_port
+
     if ssl_cert:
     if ssl_cert:
         ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
         ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
         ssl_context.load_cert_chain(ssl_cert)
         ssl_context.load_cert_chain(ssl_cert)
     else:
     else:
         ssl_context = None
         ssl_context = None
 
 
-    print('Listening on port {}'.format(port))
-    start_server = websockets.serve(handle_websocket, '127.0.0.1', port, ssl=ssl_context)
+    print('Listening on {}'.format(endpoint))
+    host, port = endpoint.rsplit(':', 1)
+    start_server = websockets.serve(handle_websocket, host, int(port), ssl=ssl_context)
     asyncio.get_event_loop().run_until_complete(start_server)
     asyncio.get_event_loop().run_until_complete(start_server)
     asyncio.get_event_loop().run_forever()
     asyncio.get_event_loop().run_forever()

+ 1 - 1
examples/signaling-server-rust/Cargo.lock

@@ -348,7 +348,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "a9f8082297d534141b30c8d39e9b1773713ab50fdbe4ff30f750d063b3bfd701"
 checksum = "a9f8082297d534141b30c8d39e9b1773713ab50fdbe4ff30f750d063b3bfd701"
 
 
 [[package]]
 [[package]]
-name = "libdatachannel_signaling_example"
+name = "libdatachannel_signaling_server_example"
 version = "0.1.0"
 version = "0.1.0"
 dependencies = [
 dependencies = [
  "futures-channel",
  "futures-channel",

+ 3 - 1
examples/signaling-server-rust/src/main.rs

@@ -92,7 +92,9 @@ async fn handle(clients: ClientsMap, stream: TcpStream) {
 #[tokio::main]
 #[tokio::main]
 async fn main() -> Result<(), std::io::Error> {
 async fn main() -> Result<(), std::io::Error> {
     let service = env::args().nth(1).unwrap_or("8000".to_string());
     let service = env::args().nth(1).unwrap_or("8000".to_string());
-    let endpoint = format!("127.0.0.1:{}", service);
+    let endpoint = if service.contains(':') { service } else { format!("127.0.0.1:{}", service) };
+
+	println!("Listening on {}", endpoint);
 
 
     let mut listener = TcpListener::bind(endpoint)
     let mut listener = TcpListener::bind(endpoint)
     	.await.expect("Listener binding failed");
     	.await.expect("Listener binding failed");

+ 4 - 2
examples/web/server.js

@@ -98,8 +98,10 @@ wsServer.on('request', (req) => {
   clients[id] = conn;
   clients[id] = conn;
 });
 });
 
 
-const hostname = '127.0.0.1';
-const port = 8000;
+const endpoint = process.env.PORT || '8000';
+const splitted = endpoint.split(':');
+const port = splitted.pop();
+const hostname = splitted.join(':') || '127.0.0.1';
 
 
 httpServer.listen(port, hostname, () => {
 httpServer.listen(port, hostname, () => {
   console.log(`Server listening on ${hostname}:${port}`);
   console.log(`Server listening on ${hostname}:${port}`);

+ 2 - 0
include/rtc/description.hpp

@@ -47,6 +47,8 @@ public:
 	Role role() const;
 	Role role() const;
 	string roleString() const;
 	string roleString() const;
 	string bundleMid() const;
 	string bundleMid() const;
+	string iceUfrag() const;
+	string icePwd() const;
 	std::optional<string> fingerprint() const;
 	std::optional<string> fingerprint() const;
 	bool ended() const;
 	bool ended() const;
 
 

+ 3 - 1
include/rtc/peerconnection.hpp

@@ -76,11 +76,13 @@ public:
 	const Configuration *config() const;
 	const Configuration *config() const;
 	State state() const;
 	State state() const;
 	GatheringState gatheringState() const;
 	GatheringState gatheringState() const;
+	bool hasLocalDescription() const;
+	bool hasRemoteDescription() const;
+	bool hasMedia() const;
 	std::optional<Description> localDescription() const;
 	std::optional<Description> localDescription() const;
 	std::optional<Description> remoteDescription() const;
 	std::optional<Description> remoteDescription() const;
 	std::optional<string> localAddress() const;
 	std::optional<string> localAddress() const;
 	std::optional<string> remoteAddress() const;
 	std::optional<string> remoteAddress() const;
-	bool hasMedia() const;
 
 
 	void setLocalDescription();
 	void setLocalDescription();
 	void setRemoteDescription(Description description);
 	void setRemoteDescription(Description description);

+ 37 - 24
include/rtc/queue.hpp

@@ -44,11 +44,15 @@ public:
 	size_t amount() const; // amount
 	size_t amount() const; // amount
 	void push(T element);
 	void push(T element);
 	std::optional<T> pop();
 	std::optional<T> pop();
+	std::optional<T> tryPop();
 	std::optional<T> peek();
 	std::optional<T> peek();
 	std::optional<T> exchange(T element);
 	std::optional<T> exchange(T element);
 	bool wait(const std::optional<std::chrono::milliseconds> &duration = nullopt);
 	bool wait(const std::optional<std::chrono::milliseconds> &duration = nullopt);
 
 
 private:
 private:
+	void pushImpl(T element);
+	std::optional<T> popImpl();
+
 	const size_t mLimit;
 	const size_t mLimit;
 	size_t mAmount;
 	size_t mAmount;
 	std::queue<T> mQueue;
 	std::queue<T> mQueue;
@@ -99,43 +103,32 @@ template <typename T> size_t Queue<T>::amount() const {
 template <typename T> void Queue<T>::push(T element) {
 template <typename T> void Queue<T>::push(T element) {
 	std::unique_lock lock(mMutex);
 	std::unique_lock lock(mMutex);
 	mPushCondition.wait(lock, [this]() { return !mLimit || mQueue.size() < mLimit || mStopping; });
 	mPushCondition.wait(lock, [this]() { return !mLimit || mQueue.size() < mLimit || mStopping; });
-	if (!mStopping) {
-		mAmount += mAmountFunction(element);
-		mQueue.emplace(std::move(element));
-		mPopCondition.notify_one();
-	}
+	pushImpl(std::move(element));
 }
 }
 
 
 template <typename T> std::optional<T> Queue<T>::pop() {
 template <typename T> std::optional<T> Queue<T>::pop() {
 	std::unique_lock lock(mMutex);
 	std::unique_lock lock(mMutex);
 	mPopCondition.wait(lock, [this]() { return !mQueue.empty() || mStopping; });
 	mPopCondition.wait(lock, [this]() { return !mQueue.empty() || mStopping; });
-	if (!mQueue.empty()) {
-		mAmount -= mAmountFunction(mQueue.front());
-		std::optional<T> element{std::move(mQueue.front())};
-		mQueue.pop();
-		return element;
-	} else {
-		return nullopt;
-	}
+	return popImpl();
+}
+
+template <typename T> std::optional<T> Queue<T>::tryPop() {
+	std::unique_lock lock(mMutex);
+	return popImpl();
 }
 }
 
 
 template <typename T> std::optional<T> Queue<T>::peek() {
 template <typename T> std::optional<T> Queue<T>::peek() {
 	std::unique_lock lock(mMutex);
 	std::unique_lock lock(mMutex);
-	if (!mQueue.empty()) {
-		return std::optional<T>{mQueue.front()};
-	} else {
-		return nullopt;
-	}
+	return !mQueue.empty() ? std::make_optional(mQueue.front()) : nullopt;
 }
 }
 
 
 template <typename T> std::optional<T> Queue<T>::exchange(T element) {
 template <typename T> std::optional<T> Queue<T>::exchange(T element) {
 	std::unique_lock lock(mMutex);
 	std::unique_lock lock(mMutex);
-	if (!mQueue.empty()) {
-		std::swap(mQueue.front(), element);
-		return std::optional<T>{element};
-	} else {
+	if (mQueue.empty())
 		return nullopt;
 		return nullopt;
-	}
+
+	std::swap(mQueue.front(), element);
+	return std::make_optional(std::move(element));
 }
 }
 
 
 template <typename T>
 template <typename T>
@@ -145,7 +138,27 @@ bool Queue<T>::wait(const std::optional<std::chrono::milliseconds> &duration) {
 		mPopCondition.wait_for(lock, *duration, [this]() { return !mQueue.empty() || mStopping; });
 		mPopCondition.wait_for(lock, *duration, [this]() { return !mQueue.empty() || mStopping; });
 	else
 	else
 		mPopCondition.wait(lock, [this]() { return !mQueue.empty() || mStopping; });
 		mPopCondition.wait(lock, [this]() { return !mQueue.empty() || mStopping; });
-	return !mStopping;
+
+	return !mQueue.empty();
+}
+
+template <typename T> void Queue<T>::pushImpl(T element) {
+	if (mStopping)
+		return;
+
+	mAmount += mAmountFunction(element);
+	mQueue.emplace(std::move(element));
+	mPopCondition.notify_one();
+}
+
+template <typename T> std::optional<T> Queue<T>::popImpl() {
+	if (mQueue.empty())
+		return nullopt;
+
+	mAmount -= mAmountFunction(mQueue.front());
+	std::optional<T> element{std::move(mQueue.front())};
+	mQueue.pop();
+	return element;
 }
 }
 
 
 } // namespace rtc
 } // namespace rtc

+ 1 - 1
src/candidate.cpp

@@ -49,7 +49,7 @@ namespace rtc {
 
 
 Candidate::Candidate(string candidate, string mid) : mIsResolved(false) {
 Candidate::Candidate(string candidate, string mid) : mIsResolved(false) {
 	const std::array prefixes{"a=", "candidate:"};
 	const std::array prefixes{"a=", "candidate:"};
-	for (string prefix : prefixes)
+	for (const string &prefix : prefixes)
 		if (hasprefix(candidate, prefix))
 		if (hasprefix(candidate, prefix))
 			candidate.erase(0, prefix.size());
 			candidate.erase(0, prefix.size());
 
 

+ 2 - 2
src/datachannel.cpp

@@ -122,8 +122,8 @@ bool DataChannel::send(const byte *data, size_t size) {
 }
 }
 
 
 std::optional<message_variant> DataChannel::receive() {
 std::optional<message_variant> DataChannel::receive() {
-	while (!mRecvQueue.empty()) {
-		auto message = *mRecvQueue.pop();
+	while (auto next = mRecvQueue.tryPop()) {
+		message_ptr message = std::move(*next);
 		if (message->type == Message::Control) {
 		if (message->type == Message::Control) {
 			auto raw = reinterpret_cast<const uint8_t *>(message->data());
 			auto raw = reinterpret_cast<const uint8_t *>(message->data());
 			if (!message->empty() && raw[0] == MESSAGE_CLOSE)
 			if (!message->empty() && raw[0] == MESSAGE_CLOSE)

+ 24 - 8
src/description.cpp

@@ -79,13 +79,15 @@ Description::Description(const string &sdp, Type type, Role role)
 	std::uniform_int_distribution<uint32_t> uniform;
 	std::uniform_int_distribution<uint32_t> uniform;
 	mSessionId = std::to_string(uniform(generator));
 	mSessionId = std::to_string(uniform(generator));
 
 
-	std::istringstream ss(sdp);
-	std::shared_ptr<Entry> current;
-
 	int index = -1;
 	int index = -1;
-	string line;
-	while (std::getline(ss, line) || !line.empty()) {
+	std::shared_ptr<Entry> current;
+	std::istringstream ss(sdp);
+	while (ss) {
+		string line;
+		std::getline(ss, line);
 		trim_end(line);
 		trim_end(line);
+		if (line.empty())
+			continue;
 
 
 		// Media description line (aka m-line)
 		// Media description line (aka m-line)
 		if (match_prefix(line, "m=")) {
 		if (match_prefix(line, "m=")) {
@@ -130,7 +132,13 @@ Description::Description(const string &sdp, Type type, Role role)
 		} else if (current) {
 		} else if (current) {
 			current->parseSdpLine(std::move(line));
 			current->parseSdpLine(std::move(line));
 		}
 		}
-	};
+	}
+
+	if (mIceUfrag.empty())
+		throw std::invalid_argument("Missing ice-ufrag parameter in SDP description");
+
+	if (mIcePwd.empty())
+		throw std::invalid_argument("Missing ice-pwd parameter in SDP description");
 }
 }
 
 
 Description::Type Description::type() const { return mType; }
 Description::Type Description::type() const { return mType; }
@@ -146,6 +154,10 @@ string Description::bundleMid() const {
 	return !mEntries.empty() ? mEntries[0]->mid() : "0";
 	return !mEntries.empty() ? mEntries[0]->mid() : "0";
 }
 }
 
 
+string Description::iceUfrag() const { return mIceUfrag; }
+
+string Description::icePwd() const { return mIcePwd; }
+
 std::optional<string> Description::fingerprint() const { return mFingerprint; }
 std::optional<string> Description::fingerprint() const { return mFingerprint; }
 
 
 bool Description::ended() const { return mEnded; }
 bool Description::ended() const { return mEnded; }
@@ -488,9 +500,13 @@ void Description::Application::parseSdpLine(string_view line) {
 
 
 Description::Media::Media(const string &sdp) : Entry(sdp, "", Direction::Unknown) {
 Description::Media::Media(const string &sdp) : Entry(sdp, "", Direction::Unknown) {
 	std::istringstream ss(sdp);
 	std::istringstream ss(sdp);
-	string line;
-	while (std::getline(ss, line) || !line.empty()) {
+	while (ss) {
+		string line;
+		std::getline(ss, line);
 		trim_end(line);
 		trim_end(line);
+		if (line.empty())
+			continue;
+
 		parseSdpLine(line);
 		parseSdpLine(line);
 	}
 	}
 
 

+ 6 - 6
src/dtlstransport.cpp

@@ -258,7 +258,7 @@ ssize_t DtlsTransport::WriteCallback(gnutls_transport_ptr_t ptr, const void *dat
 ssize_t DtlsTransport::ReadCallback(gnutls_transport_ptr_t ptr, void *data, size_t maxlen) {
 ssize_t DtlsTransport::ReadCallback(gnutls_transport_ptr_t ptr, void *data, size_t maxlen) {
 	DtlsTransport *t = static_cast<DtlsTransport *>(ptr);
 	DtlsTransport *t = static_cast<DtlsTransport *>(ptr);
 	if (auto next = t->mIncomingQueue.pop()) {
 	if (auto next = t->mIncomingQueue.pop()) {
-		auto message = *next;
+		message_ptr message = std::move(*next);
 		ssize_t len = std::min(maxlen, message->size());
 		ssize_t len = std::min(maxlen, message->size());
 		std::memcpy(data, message->data(), len);
 		std::memcpy(data, message->data(), len);
 		gnutls_transport_set_errno(t->mSession, 0);
 		gnutls_transport_set_errno(t->mSession, 0);
@@ -271,9 +271,9 @@ ssize_t DtlsTransport::ReadCallback(gnutls_transport_ptr_t ptr, void *data, size
 
 
 int DtlsTransport::TimeoutCallback(gnutls_transport_ptr_t ptr, unsigned int ms) {
 int DtlsTransport::TimeoutCallback(gnutls_transport_ptr_t ptr, unsigned int ms) {
 	DtlsTransport *t = static_cast<DtlsTransport *>(ptr);
 	DtlsTransport *t = static_cast<DtlsTransport *>(ptr);
-	t->mIncomingQueue.wait(ms != GNUTLS_INDEFINITE_TIMEOUT ? std::make_optional(milliseconds(ms))
-	                                                       : nullopt);
-	return !t->mIncomingQueue.empty() ? 1 : 0;
+	bool notEmpty = t->mIncomingQueue.wait(
+	    ms != GNUTLS_INDEFINITE_TIMEOUT ? std::make_optional(milliseconds(ms)) : nullopt);
+	return notEmpty ? 1 : 0;
 }
 }
 
 
 #else // USE_GNUTLS==0
 #else // USE_GNUTLS==0
@@ -437,8 +437,8 @@ void DtlsTransport::runRecvLoop() {
 		byte buffer[bufferSize];
 		byte buffer[bufferSize];
 		while (true) {
 		while (true) {
 			// Process pending messages
 			// Process pending messages
-			while (!mIncomingQueue.empty()) {
-				auto message = *mIncomingQueue.pop();
+			while (auto next = mIncomingQueue.tryPop()) {
+				message_ptr message = std::move(*next);
 				BIO_write(mInBio, message->data(), int(message->size()));
 				BIO_write(mInBio, message->data(), int(message->size()));
 
 
 				if (state() == State::Connecting) {
 				if (state() == State::Connecting) {

+ 24 - 1
src/icetransport.cpp

@@ -58,8 +58,31 @@ IceTransport::IceTransport(const Configuration &config, Description::Role role,
 	if (config.enableIceTcp) {
 	if (config.enableIceTcp) {
 		PLOG_WARNING << "ICE-TCP is not supported with libjuice";
 		PLOG_WARNING << "ICE-TCP is not supported with libjuice";
 	}
 	}
+
+	juice_log_level_t level;
+	switch (plog::get()->getMaxSeverity()) {
+	case plog::none:
+		level = JUICE_LOG_LEVEL_NONE;
+		break;
+	case plog::fatal:
+		level = JUICE_LOG_LEVEL_VERBOSE;
+		break;
+	case plog::error:
+		level = JUICE_LOG_LEVEL_ERROR;
+		break;
+	case plog::warning:
+		level = JUICE_LOG_LEVEL_WARN;
+		break;
+	case plog::info:
+	case plog::debug: // juice debug is output as verbose
+		level = JUICE_LOG_LEVEL_INFO;
+		break;
+	default:
+		level = JUICE_LOG_LEVEL_VERBOSE;
+		break;
+	}
 	juice_set_log_handler(IceTransport::LogCallback);
 	juice_set_log_handler(IceTransport::LogCallback);
-	juice_set_log_level(JUICE_LOG_LEVEL_VERBOSE);
+	juice_set_log_level(level);
 
 
 	juice_config_t jconfig = {};
 	juice_config_t jconfig = {};
 	jconfig.cb_state_changed = IceTransport::StateChangeCallback;
 	jconfig.cb_state_changed = IceTransport::StateChangeCallback;

+ 52 - 14
src/peerconnection.cpp

@@ -82,11 +82,26 @@ std::optional<Description> PeerConnection::remoteDescription() const {
 	return mRemoteDescription;
 	return mRemoteDescription;
 }
 }
 
 
+bool PeerConnection::hasLocalDescription() const {
+	std::lock_guard lock(mLocalDescriptionMutex);
+	return bool(mLocalDescription);
+}
+
+bool PeerConnection::hasRemoteDescription() const {
+	std::lock_guard lock(mRemoteDescriptionMutex);
+	return bool(mRemoteDescription);
+}
+
+bool PeerConnection::hasMedia() const {
+	auto local = localDescription();
+	return local && local->hasAudioOrVideo();
+}
+
 void PeerConnection::setLocalDescription() {
 void PeerConnection::setLocalDescription() {
 	PLOG_VERBOSE << "Setting local description";
 	PLOG_VERBOSE << "Setting local description";
 
 
 	if (std::atomic_load(&mIceTransport)) {
 	if (std::atomic_load(&mIceTransport)) {
-		PLOG_DEBUG << "Local description is already set";
+		PLOG_DEBUG << "Local description is already set, ignoring";
 	}
 	}
 
 
 	// RFC 5763: The endpoint that is the offerer MUST use the setup attribute value of
 	// RFC 5763: The endpoint that is the offerer MUST use the setup attribute value of
@@ -101,6 +116,9 @@ void PeerConnection::setLocalDescription() {
 void PeerConnection::setRemoteDescription(Description description) {
 void PeerConnection::setRemoteDescription(Description description) {
 	PLOG_VERBOSE << "Setting remote description: " << string(description);
 	PLOG_VERBOSE << "Setting remote description: " << string(description);
 
 
+	if (hasRemoteDescription())
+		throw std::logic_error("Remote description is already set");
+
 	if (description.mediaCount() == 0)
 	if (description.mediaCount() == 0)
 		throw std::invalid_argument("Remote description has no media line");
 		throw std::invalid_argument("Remote description has no media line");
 
 
@@ -120,9 +138,27 @@ void PeerConnection::setRemoteDescription(Description description) {
 	if (!description.fingerprint())
 	if (!description.fingerprint())
 		throw std::invalid_argument("Remote description has no fingerprint");
 		throw std::invalid_argument("Remote description has no fingerprint");
 
 
-	description.hintType(localDescription() ? Description::Type::Answer : Description::Type::Offer);
-	auto type = description.type();
-	auto remoteCandidates = description.extractCandidates(); // Candidates will be added at the end
+	description.hintType(hasLocalDescription() ? Description::Type::Answer
+	                                           : Description::Type::Offer);
+
+	if (description.type() == Description::Type::Offer) {
+		if (hasLocalDescription()) {
+			PLOG_ERROR << "Got a remote offer description while an answer was expected";
+			throw std::logic_error("Got an unexpected remote offer description");
+		}
+	} else { // Answer
+		if (auto local = localDescription()) {
+			if (description.iceUfrag() == local->iceUfrag() &&
+			    description.icePwd() == local->icePwd())
+				throw std::logic_error("Got the local description as remote description");
+		} else {
+			PLOG_ERROR << "Got a remote answer description while an offer was expected";
+			throw std::logic_error("Got an unexpected remote answer description");
+		}
+	}
+
+	// Candidates will be added at the end, extract them for now
+	auto remoteCandidates = description.extractCandidates();
 
 
 	auto iceTransport = std::atomic_load(&mIceTransport);
 	auto iceTransport = std::atomic_load(&mIceTransport);
 	if (!iceTransport)
 	if (!iceTransport)
@@ -130,11 +166,12 @@ void PeerConnection::setRemoteDescription(Description description) {
 	iceTransport->setRemoteDescription(description);
 	iceTransport->setRemoteDescription(description);
 
 
 	{
 	{
+		// Set as remote description
 		std::lock_guard lock(mRemoteDescriptionMutex);
 		std::lock_guard lock(mRemoteDescriptionMutex);
 		mRemoteDescription.emplace(std::move(description));
 		mRemoteDescription.emplace(std::move(description));
 	}
 	}
 
 
-	if (type == Description::Type::Offer) {
+	if (description.type() == Description::Type::Offer) {
 		// This is an offer and we are the answerer.
 		// This is an offer and we are the answerer.
 		Description localDescription = iceTransport->getLocalDescription(Description::Type::Answer);
 		Description localDescription = iceTransport->getLocalDescription(Description::Type::Answer);
 		processLocalDescription(localDescription);
 		processLocalDescription(localDescription);
@@ -161,7 +198,7 @@ void PeerConnection::setRemoteDescription(Description description) {
 
 
 	for (const auto &candidate : remoteCandidates)
 	for (const auto &candidate : remoteCandidates)
 		addRemoteCandidate(candidate);
 		addRemoteCandidate(candidate);
-	}
+}
 
 
 void PeerConnection::addRemoteCandidate(Candidate candidate) {
 void PeerConnection::addRemoteCandidate(Candidate candidate) {
 	PLOG_VERBOSE << "Adding remote candidate: " << string(candidate);
 	PLOG_VERBOSE << "Adding remote candidate: " << string(candidate);
@@ -250,13 +287,8 @@ void PeerConnection::onGatheringStateChange(std::function<void(GatheringState st
 	mGatheringStateChangeCallback = callback;
 	mGatheringStateChangeCallback = callback;
 }
 }
 
 
-bool PeerConnection::hasMedia() const {
-	auto local = localDescription();
-	return local && local->hasAudioOrVideo();
-}
-
 std::shared_ptr<Track> PeerConnection::addTrack(Description::Media description) {
 std::shared_ptr<Track> PeerConnection::addTrack(Description::Media description) {
-	if (localDescription())
+	if (hasLocalDescription())
 		throw std::logic_error("Tracks must be created before local description");
 		throw std::logic_error("Tracks must be created before local description");
 
 
 	if (auto it = mTracks.find(description.mid()); it != mTracks.end())
 	if (auto it = mTracks.find(description.mid()); it != mTracks.end())
@@ -699,6 +731,9 @@ void PeerConnection::openTracks() {
 void PeerConnection::processLocalDescription(Description description) {
 void PeerConnection::processLocalDescription(Description description) {
 	int activeMediaCount = 0;
 	int activeMediaCount = 0;
 
 
+	if (hasLocalDescription())
+		throw std::logic_error("Local description is already set");
+
 	if (auto remote = remoteDescription()) {
 	if (auto remote = remoteDescription()) {
 		// Reciprocate remote description
 		// Reciprocate remote description
 		for (int i = 0; i < remote->mediaCount(); ++i)
 		for (int i = 0; i < remote->mediaCount(); ++i)
@@ -782,8 +817,11 @@ void PeerConnection::processLocalDescription(Description description) {
 	// Set local fingerprint (wait for certificate if necessary)
 	// Set local fingerprint (wait for certificate if necessary)
 	description.setFingerprint(mCertificate.get()->fingerprint());
 	description.setFingerprint(mCertificate.get()->fingerprint());
 
 
-	std::lock_guard lock(mLocalDescriptionMutex);
-	mLocalDescription.emplace(std::move(description));
+	{
+		// Set as local description
+		std::lock_guard lock(mLocalDescriptionMutex);
+		mLocalDescription.emplace(std::move(description));
+	}
 
 
 	mProcessor->enqueue([this, description = *mLocalDescription]() {
 	mProcessor->enqueue([this, description = *mLocalDescription]() {
 		PLOG_VERBOSE << "Issuing local description: " << description;
 		PLOG_VERBOSE << "Issuing local description: " << description;

+ 6 - 12
src/sctptransport.cpp

@@ -322,7 +322,7 @@ void SctpTransport::incoming(message_ptr message) {
 bool SctpTransport::trySendQueue() {
 bool SctpTransport::trySendQueue() {
 	// Requires mSendMutex to be locked
 	// Requires mSendMutex to be locked
 	while (auto next = mSendQueue.peek()) {
 	while (auto next = mSendQueue.peek()) {
-		auto message = *next;
+		message_ptr message = std::move(*next);
 		if (!trySendMessage(message))
 		if (!trySendMessage(message))
 			return false;
 			return false;
 		mSendQueue.pop();
 		mSendQueue.pop();
@@ -702,8 +702,9 @@ std::optional<milliseconds> SctpTransport::rtt() {
 }
 }
 
 
 int SctpTransport::RecvCallback(struct socket *sock, union sctp_sockstore addr, void *data,
 int SctpTransport::RecvCallback(struct socket *sock, union sctp_sockstore addr, void *data,
-                                size_t len, struct sctp_rcvinfo recv_info, int flags, void *ptr) {
-	auto *transport = static_cast<SctpTransport *>(ptr);
+                                size_t len, struct sctp_rcvinfo recv_info, int flags,
+                                void *ulp_info) {
+	auto *transport = static_cast<SctpTransport *>(ulp_info);
 
 
 	std::shared_lock lock(InstancesMutex);
 	std::shared_lock lock(InstancesMutex);
 	if (Instances.find(transport) == Instances.end()) {
 	if (Instances.find(transport) == Instances.end()) {
@@ -717,15 +718,8 @@ int SctpTransport::RecvCallback(struct socket *sock, union sctp_sockstore addr,
 	return ret;
 	return ret;
 }
 }
 
 
-int SctpTransport::SendCallback(struct socket *sock, uint32_t sb_free) {
-	struct sctp_paddrinfo paddrinfo = {};
-	socklen_t len = sizeof(paddrinfo);
-	if (usrsctp_getsockopt(sock, IPPROTO_SCTP, SCTP_GET_PEER_ADDR_INFO, &paddrinfo, &len))
-		return -1;
-
-	auto sconn = reinterpret_cast<struct sockaddr_conn *>(&paddrinfo.spinfo_address);
-	void *ptr = sconn->sconn_addr;
-	auto *transport = static_cast<SctpTransport *>(ptr);
+int SctpTransport::SendCallback(struct socket *, uint32_t sb_free, void *ulp_info) {
+	auto *transport = static_cast<SctpTransport *>(ulp_info);
 
 
 	std::shared_lock lock(InstancesMutex);
 	std::shared_lock lock(InstancesMutex);
 	if (Instances.find(transport) == Instances.end())
 	if (Instances.find(transport) == Instances.end())

+ 2 - 2
src/sctptransport.hpp

@@ -110,8 +110,8 @@ private:
 	std::atomic<size_t> mBytesSent = 0, mBytesReceived = 0;
 	std::atomic<size_t> mBytesSent = 0, mBytesReceived = 0;
 
 
 	static int RecvCallback(struct socket *sock, union sctp_sockstore addr, void *data, size_t len,
 	static int RecvCallback(struct socket *sock, union sctp_sockstore addr, void *data, size_t len,
-	                        struct sctp_rcvinfo recv_info, int flags, void *user_data);
-	static int SendCallback(struct socket *sock, uint32_t sb_free);
+	                        struct sctp_rcvinfo recv_info, int flags, void *ulp_info);
+	static int SendCallback(struct socket *sock, uint32_t sb_free, void *ulp_info);
 	static int WriteCallback(void *sctp_ptr, void *data, size_t len, uint8_t tos, uint8_t set_df);
 	static int WriteCallback(void *sctp_ptr, void *data, size_t len, uint8_t tos, uint8_t set_df);
 
 
 	static std::unordered_set<SctpTransport *> Instances;
 	static std::unordered_set<SctpTransport *> Instances;

+ 1 - 1
src/tcptransport.cpp

@@ -271,7 +271,7 @@ void TcpTransport::close() {
 bool TcpTransport::trySendQueue() {
 bool TcpTransport::trySendQueue() {
 	// mSockMutex must be locked
 	// mSockMutex must be locked
 	while (auto next = mSendQueue.peek()) {
 	while (auto next = mSendQueue.peek()) {
-		auto message = *next;
+		message_ptr message = std::move(*next);
 		if (!trySendMessage(message)) {
 		if (!trySendMessage(message)) {
 			mSendQueue.exchange(message);
 			mSendQueue.exchange(message);
 			return false;
 			return false;

+ 4 - 6
src/tlstransport.cpp

@@ -238,11 +238,9 @@ ssize_t TlsTransport::ReadCallback(gnutls_transport_ptr_t ptr, void *data, size_
 
 
 int TlsTransport::TimeoutCallback(gnutls_transport_ptr_t ptr, unsigned int ms) {
 int TlsTransport::TimeoutCallback(gnutls_transport_ptr_t ptr, unsigned int ms) {
 	TlsTransport *t = static_cast<TlsTransport *>(ptr);
 	TlsTransport *t = static_cast<TlsTransport *>(ptr);
-	if (ms != GNUTLS_INDEFINITE_TIMEOUT)
-		t->mIncomingQueue.wait(milliseconds(ms));
-	else
-		t->mIncomingQueue.wait();
-	return !t->mIncomingQueue.empty() ? 1 : 0;
+	bool notEmpty = t->mIncomingQueue.wait(
+	    ms != GNUTLS_INDEFINITE_TIMEOUT ? std::make_optional(milliseconds(ms)) : nullopt);
+	return notEmpty ? 1 : 0;
 }
 }
 
 
 #else // USE_GNUTLS==0
 #else // USE_GNUTLS==0
@@ -413,7 +411,7 @@ void TlsTransport::runRecvLoop() {
 			if (!next)
 			if (!next)
 				break;
 				break;
 
 
-			message_ptr message = *next;
+			message_ptr message = std::move(*next);
 			if (message->size() > 0)
 			if (message->size() > 0)
 				BIO_write(mInBio, message->data(), int(message->size())); // Input
 				BIO_write(mInBio, message->data(), int(message->size())); // Input
 			else
 			else

+ 2 - 2
src/track.cpp

@@ -45,8 +45,8 @@ bool Track::send(const byte *data, size_t size) {
 }
 }
 
 
 std::optional<message_variant> Track::receive() {
 std::optional<message_variant> Track::receive() {
-	if (!mRecvQueue.empty())
-		return to_variant(std::move(**mRecvQueue.pop()));
+	if (auto next = mRecvQueue.tryPop())
+		return to_variant(std::move(**next));
 
 
 	return nullopt;
 	return nullopt;
 }
 }

+ 2 - 2
src/websocket.cpp

@@ -110,8 +110,8 @@ bool WebSocket::isClosed() const { return mState == State::Closed; }
 size_t WebSocket::maxMessageSize() const { return DEFAULT_MAX_MESSAGE_SIZE; }
 size_t WebSocket::maxMessageSize() const { return DEFAULT_MAX_MESSAGE_SIZE; }
 
 
 std::optional<message_variant> WebSocket::receive() {
 std::optional<message_variant> WebSocket::receive() {
-	while (!mRecvQueue.empty()) {
-		auto message = *mRecvQueue.pop();
+	while (auto next = mRecvQueue.tryPop()) {
+		message_ptr message = std::move(*next);
 		if (message->type != Message::Control)
 		if (message->type != Message::Control)
 			return to_variant(std::move(*message));
 			return to_variant(std::move(*message));
 	}
 	}