Browse Source

Added PeerConnection::close() and revised state machine

Paul-Louis Ageneau 5 years ago
parent
commit
1d7d1358be
7 changed files with 66 additions and 26 deletions
  1. 1 1
      Makefile
  2. 2 1
      include/rtc/datachannel.hpp
  3. 5 1
      include/rtc/peerconnection.hpp
  4. 2 1
      include/rtc/rtc.h
  5. 13 12
      src/datachannel.cpp
  6. 40 5
      src/peerconnection.cpp
  7. 3 5
      src/sctptransport.cpp

+ 1 - 1
Makefile

@@ -11,7 +11,7 @@ LIBS=glib-2.0 gobject-2.0 nice
 USRSCTP_DIR=usrsctp
 
 USE_GNUTLS ?= 0
-ifeq ($(USE_GNUTLS), 1)
+ifneq ($(USE_GNUTLS), 0)
         CPPFLAGS+= -DUSE_GNUTLS=1
         LIBS+= gnutls
 else

+ 2 - 1
include/rtc/datachannel.hpp

@@ -66,12 +66,13 @@ public:
 	Reliability reliability() const;
 
 private:
+	void remoteClose();
 	void open(std::shared_ptr<SctpTransport> sctpTransport);
 	bool outgoing(mutable_message_ptr message);
 	void incoming(message_ptr message);
 	void processOpenMessage(message_ptr message);
 
-	std::shared_ptr<PeerConnection> mPeerConnection;
+	const std::shared_ptr<PeerConnection> mPeerConnection;
 	std::shared_ptr<SctpTransport> mSctpTransport;
 
 	unsigned int mStream;

+ 5 - 1
include/rtc/peerconnection.hpp

@@ -50,7 +50,8 @@ public:
 		Connected = RTC_CONNECTED,
 		Disconnected = RTC_DISCONNECTED,
 		Failed = RTC_FAILED,
-		Closed = RTC_CLOSED
+		Closed = RTC_CLOSED,
+		Destroying = RTC_DESTROYING
 	};
 
 	enum class GatheringState : int {
@@ -63,6 +64,8 @@ public:
 	PeerConnection(const Configuration &config);
 	~PeerConnection();
 
+	void close();
+
 	const Configuration *config() const;
 	State state() const;
 	GatheringState gatheringState() const;
@@ -94,6 +97,7 @@ private:
 	void iterateDataChannels(std::function<void(std::shared_ptr<DataChannel> channel)> func);
 	void openDataChannels();
 	void closeDataChannels();
+	void remoteCloseDataChannels();
 
 	void processLocalDescription(Description description);
 	void processLocalCandidate(Candidate candidate);

+ 2 - 1
include/rtc/rtc.h

@@ -31,7 +31,8 @@ typedef enum {
 	RTC_CONNECTED = 2,
 	RTC_DISCONNECTED = 3,
 	RTC_FAILED = 4,
-	RTC_CLOSED = 5
+	RTC_CLOSED = 5,
+	RTC_DESTROYING = 6 // internal
 } rtc_state_t;
 
 typedef enum {

+ 13 - 12
src/datachannel.cpp

@@ -73,18 +73,23 @@ DataChannel::DataChannel(shared_ptr<PeerConnection> pc, shared_ptr<SctpTransport
       mReliability(std::make_shared<Reliability>()),
       mRecvQueue(RECV_QUEUE_LIMIT, message_size_func) {}
 
-DataChannel::~DataChannel() { close(); }
+DataChannel::~DataChannel() {
+	close();
+}
 
 void DataChannel::close() {
-	mIsOpen = false;
-	if (!mIsClosed.exchange(true)) {
+	mIsClosed = true;
+	if (mIsOpen.exchange(false))
 		if (mSctpTransport)
 			mSctpTransport->reset(mStream);
-	}
 
-	// Reset mSctpTransport first so SctpTransport is never alive without PeerConnection
 	mSctpTransport.reset();
-	mPeerConnection.reset();
+}
+
+void DataChannel::remoteClose() {
+	mIsOpen = false;
+	if (!mIsClosed.exchange(true))
+		triggerClosed();
 }
 
 bool DataChannel::send(const std::variant<binary, string> &data) {
@@ -108,12 +113,8 @@ std::optional<std::variant<binary, string>> DataChannel::receive() {
 		switch (message->type) {
 		case Message::Control: {
 			auto raw = reinterpret_cast<const uint8_t *>(message->data());
-			if (raw[0] == MESSAGE_CLOSE) {
-				if (mIsOpen) {
-					close();
-					triggerClosed();
-				}
-			}
+			if (raw[0] == MESSAGE_CLOSE)
+				remoteClose();
 			break;
 		}
 		case Message::String:

+ 40 - 5
src/peerconnection.cpp

@@ -38,6 +38,19 @@ PeerConnection::PeerConnection(const Configuration &config)
     : mConfig(config), mCertificate(make_certificate("libdatachannel")), mState(State::New) {}
 
 PeerConnection::~PeerConnection() {
+	changeState(State::Destroying);
+	close();
+	mSctpTransport.reset();
+	mDtlsTransport.reset();
+	mIceTransport.reset();
+}
+
+void PeerConnection::close() {
+	// Close DataChannels
+	closeDataChannels();
+	mDataChannels.clear();
+
+	// Close Transports
 	if (auto transport = std::atomic_load(&mIceTransport))
 		transport->stop();
 	if (auto transport = std::atomic_load(&mDtlsTransport))
@@ -45,9 +58,8 @@ PeerConnection::~PeerConnection() {
 	if (auto transport = std::atomic_load(&mSctpTransport))
 		transport->stop();
 
-	mSctpTransport.reset();
-	mDtlsTransport.reset();
-	mIceTransport.reset();
+	// Change state
+	changeState(State::Closed);
 }
 
 const Configuration *PeerConnection::config() const { return &mConfig; }
@@ -210,6 +222,9 @@ shared_ptr<IceTransport> PeerConnection::initIceTransport(Description::Role role
 		    case IceTransport::State::Connected:
 			    initDtlsTransport();
 			    break;
+		    case IceTransport::State::Disconnected:
+			    changeState(State::Disconnected);
+			    break;
 		    default:
 			    // Ignore
 			    break;
@@ -246,6 +261,9 @@ shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
 		    case DtlsTransport::State::Failed:
 			    changeState(State::Failed);
 			    break;
+		    case DtlsTransport::State::Disconnected:
+			    changeState(State::Disconnected);
+			    break;
 		    default:
 			    // Ignore
 			    break;
@@ -293,7 +311,7 @@ bool PeerConnection::checkFingerprint(const std::string &fingerprint) const {
 
 void PeerConnection::forwardMessage(message_ptr message) {
 	if (!message) {
-		closeDataChannels();
+		remoteCloseDataChannels();
 		return;
 	}
 
@@ -368,6 +386,10 @@ void PeerConnection::closeDataChannels() {
 	iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->close(); });
 }
 
+void PeerConnection::remoteCloseDataChannels() {
+	iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->remoteClose(); });
+}
+
 void PeerConnection::processLocalDescription(Description description) {
 	std::optional<uint16_t> remoteSctpPort;
 	if (auto remote = remoteDescription())
@@ -401,7 +423,14 @@ void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
 }
 
 void PeerConnection::changeState(State state) {
-	if (mState.exchange(state) != state)
+	State current;
+	do {
+		current = mState.load();
+		if (current == state || current == State::Destroying)
+			return;
+	} while (!mState.compare_exchange_weak(current, state));
+
+	if (state != State::Destroying)
 		mStateChangeCallback(state);
 }
 
@@ -431,6 +460,12 @@ std::ostream &operator<<(std::ostream &out, const rtc::PeerConnection::State &st
 	case State::Failed:
 		str = "failed";
 		break;
+	case State::Closed:
+		str = "closed";
+		break;
+	case State::Destroying:
+		str = "destroying";
+		break;
 	default:
 		str = "unknown";
 		break;

+ 3 - 5
src/sctptransport.cpp

@@ -145,12 +145,10 @@ SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port,
 SctpTransport::~SctpTransport() {
 	stop();
 
-	if (mSock) {
-		usrsctp_shutdown(mSock, SHUT_RDWR);
-		usrsctp_close(mSock);
-	}
-
+	usrsctp_shutdown(mSock, SHUT_RDWR);
+	usrsctp_close(mSock);
 	usrsctp_deregister_address(this);
+
 	GlobalCleanup();
 }