Explorar o código

Merge pull request #576 from paullouisageneau/onclose-behavior

Call onClose callback synchonously when close() is called
Paul-Louis Ageneau %!s(int64=3) %!d(string=hai) anos
pai
achega
fa18a6bae6

+ 33 - 16
DOC.md

@@ -405,50 +405,67 @@ int rtcSetAvailableCallback(int id, rtcAvailableCallbackFunc cb)
 
 It is called when messages are now available to be received with `rtcReceiveMessage`.
 
-#### rtcIsOpen
+#### rtcSendMessage
 
 ```
-bool rtcIsOpen(int id)
+int rtcSendMessage(int id, const char *data, int size)
 ```
 
+Sends a message in the channel.
+
 Arguments:
 
 - `id`: the channel identifier
+- `data`: the message data
+- `size`: if size >= 0, `data` is interpreted as a binary message of length `size`, otherwise it is interpreted as a null-terminated UTF-8 string.
 
-Return value: `true` if the channel exists and is open, `false` otherwise
+Return value: `RTC_ERR_SUCCESS` or a negative error code
 
-#### rtcIsClosed
+The message is sent immediately if possible, otherwise it is buffered to be sent later.
+
+Data Channel and WebSocket: If the message may not be sent immediately due to flow control or congestion control, it is buffered until it can actually be sent. You can retrieve the current buffered data size with `rtcGetBufferedAmount`.
+
+Track: There is no flow or congestion control, messages are never buffered and `rtcGetBufferedAmount` always returns 0.
+
+#### rtcClose
 
 ```
-bool rtcIsClosed(int id)
+int rtcClose(int id)
 ```
 
+Close the channel.
+
 Arguments:
 
 - `id`: the channel identifier
 
-Return value: `true` if the channel exists and is closed (not open and not connecting), `false` otherwise
+Return value: `RTC_ERR_SUCCESS` or a negative error code
 
-#### rtcSendMessage
+WebSocket: Like with the JavaScript API, the state will first change to closing, then closed only after the connection has been actually closed.
+
+#### rtcIsOpen
 
 ```
-int rtcSendMessage(int id, const char *data, int size)
+bool rtcIsOpen(int id)
 ```
 
-Sends a message in the channel.
-
 Arguments:
 
 - `id`: the channel identifier
-- `data`: the message data
-- `size`: if size >= 0, `data` is interpreted as a binary message of length `size`, otherwise it is interpreted as a null-terminated UTF-8 string.
 
-Return value: `RTC_ERR_SUCCESS` or a negative error code
+Return value: `true` if the channel exists and is open, `false` otherwise
 
-The message is sent immediately if possible, otherwise it is buffered to be sent later.
+#### rtcIsClosed
 
-Data Channel and WebSocket: If the message may not be sent immediately due to flow control or congestion control, it is buffered until it can actually be sent. You can retrieve the current buffered data size with `rtcGetBufferedAmount`.
-Tracks are an exception: There is no flow or congestion control, messages are never buffered and `rtcGetBufferedAmount` always returns 0.
+```
+bool rtcIsClosed(int id)
+```
+
+Arguments:
+
+- `id`: the channel identifier
+
+Return value: `true` if the channel exists and is closed (not open and not connecting), `false` otherwise
 
 #### rtcGetBufferedAmount
 

+ 2 - 0
include/rtc/channel.hpp

@@ -54,6 +54,8 @@ public:
 	void onBufferedAmountLow(std::function<void()> callback);
 	void setBufferedAmountLowThreshold(size_t amount);
 
+	void resetCallbacks();
+
 	// Extended API
 	optional<message_variant> receive(); // only if onMessage unset
 	optional<message_variant> peek();    // only if onMessage unset

+ 2 - 0
include/rtc/peerconnection.hpp

@@ -103,6 +103,8 @@ public:
 	void onGatheringStateChange(std::function<void(GatheringState state)> callback);
 	void onSignalingStateChange(std::function<void(SignalingState state)> callback);
 
+	void resetCallbacks();
+
 	// Stats
 	void clearStats();
 	size_t bytesSent();

+ 1 - 0
include/rtc/rtc.h

@@ -196,6 +196,7 @@ RTC_EXPORT int rtcSetClosedCallback(int id, rtcClosedCallbackFunc cb);
 RTC_EXPORT int rtcSetErrorCallback(int id, rtcErrorCallbackFunc cb);
 RTC_EXPORT int rtcSetMessageCallback(int id, rtcMessageCallbackFunc cb);
 RTC_EXPORT int rtcSendMessage(int id, const char *data, int size);
+RTC_EXPORT int rtcClose(int id);
 RTC_EXPORT bool rtcIsOpen(int id);
 RTC_EXPORT bool rtcIsClosed(int id);
 

+ 1 - 0
include/rtc/websocket.hpp

@@ -59,6 +59,7 @@ public:
 
 	void open(const string &url);
 	void close() override;
+	void forceClose();
 	bool send(const message_variant data) override;
 	bool send(const byte *data, size_t size) override;
 

+ 33 - 16
pages/content/pages/reference.md

@@ -408,50 +408,67 @@ int rtcSetAvailableCallback(int id, rtcAvailableCallbackFunc cb)
 
 It is called when messages are now available to be received with `rtcReceiveMessage`.
 
-#### rtcIsOpen
+#### rtcSendMessage
 
 ```
-bool rtcIsOpen(int id)
+int rtcSendMessage(int id, const char *data, int size)
 ```
 
+Sends a message in the channel.
+
 Arguments:
 
 - `id`: the channel identifier
+- `data`: the message data
+- `size`: if size >= 0, `data` is interpreted as a binary message of length `size`, otherwise it is interpreted as a null-terminated UTF-8 string.
 
-Return value: `true` if the channel exists and is open, `false` otherwise
+Return value: `RTC_ERR_SUCCESS` or a negative error code
 
-#### rtcIsClosed
+The message is sent immediately if possible, otherwise it is buffered to be sent later.
+
+Data Channel and WebSocket: If the message may not be sent immediately due to flow control or congestion control, it is buffered until it can actually be sent. You can retrieve the current buffered data size with `rtcGetBufferedAmount`.
+
+Track: There is no flow or congestion control, messages are never buffered and `rtcGetBufferedAmount` always returns 0.
+
+#### rtcClose
 
 ```
-bool rtcIsClosed(int id)
+int rtcClose(int id)
 ```
 
+Close the channel.
+
 Arguments:
 
 - `id`: the channel identifier
 
-Return value: `true` if the channel exists and is closed (not open and not connecting), `false` otherwise
+Return value: `RTC_ERR_SUCCESS` or a negative error code
 
-#### rtcSendMessage
+WebSocket: Like with the JavaScript API, the state will first change to closing, then closed only after the connection has been actually closed.
+
+#### rtcIsOpen
 
 ```
-int rtcSendMessage(int id, const char *data, int size)
+bool rtcIsOpen(int id)
 ```
 
-Sends a message in the channel.
-
 Arguments:
 
 - `id`: the channel identifier
-- `data`: the message data
-- `size`: if size >= 0, `data` is interpreted as a binary message of length `size`, otherwise it is interpreted as a null-terminated UTF-8 string.
 
-Return value: `RTC_ERR_SUCCESS` or a negative error code
+Return value: `true` if the channel exists and is open, `false` otherwise
 
-The message is sent immediately if possible, otherwise it is buffered to be sent later.
+#### rtcIsClosed
 
-Data Channel and WebSocket: If the message may not be sent immediately due to flow control or congestion control, it is buffered until it can actually be sent. You can retrieve the current buffered data size with `rtcGetBufferedAmount`.
-Tracks are an exception: There is no flow or congestion control, messages are never buffered and `rtcGetBufferedAmount` always returns 0.
+```
+bool rtcIsClosed(int id)
+```
+
+Arguments:
+
+- `id`: the channel identifier
+
+Return value: `true` if the channel exists and is closed (not open and not connecting), `false` otherwise
 
 #### rtcGetBufferedAmount
 

+ 18 - 34
src/capi.cpp

@@ -381,13 +381,7 @@ int rtcCreatePeerConnection(const rtcConfiguration *config) {
 int rtcDeletePeerConnection(int pc) {
 	return wrap([pc] {
 		auto peerConnection = getPeerConnection(pc);
-		peerConnection->onDataChannel(nullptr);
-		peerConnection->onTrack(nullptr);
-		peerConnection->onLocalDescription(nullptr);
-		peerConnection->onLocalCandidate(nullptr);
-		peerConnection->onStateChange(nullptr);
-		peerConnection->onGatheringStateChange(nullptr);
-
+		peerConnection->close();
 		erasePeerConnection(pc);
 		return RTC_ERR_SUCCESS;
 	});
@@ -698,6 +692,14 @@ int rtcSendMessage(int id, const char *data, int size) {
 	});
 }
 
+int rtcClose(int id) {
+	return wrap([&] {
+		auto channel = getChannel(id);
+		channel->close();
+		return RTC_ERR_SUCCESS;
+	});
+}
+
 bool rtcIsOpen(int id) {
 	return wrap([id] { return getChannel(id)->isOpen() ? 0 : 1; }) == 0 ? true : false;
 }
@@ -836,13 +838,7 @@ int rtcCreateDataChannelEx(int pc, const char *label, const rtcDataChannelInit *
 int rtcDeleteDataChannel(int dc) {
 	return wrap([dc] {
 		auto dataChannel = getDataChannel(dc);
-		dataChannel->onOpen(nullptr);
-		dataChannel->onClosed(nullptr);
-		dataChannel->onError(nullptr);
-		dataChannel->onMessage(nullptr);
-		dataChannel->onBufferedAmountLow(nullptr);
-		dataChannel->onAvailable(nullptr);
-
+		dataChannel->close();
 		eraseDataChannel(dc);
 		return RTC_ERR_SUCCESS;
 	});
@@ -994,13 +990,7 @@ int rtcAddTrackEx(int pc, const rtcTrackInit *init) {
 int rtcDeleteTrack(int tr) {
 	return wrap([&] {
 		auto track = getTrack(tr);
-		track->onOpen(nullptr);
-		track->onClosed(nullptr);
-		track->onError(nullptr);
-		track->onMessage(nullptr);
-		track->onBufferedAmountLow(nullptr);
-		track->onAvailable(nullptr);
-
+		track->close();
 		eraseTrack(tr);
 		return RTC_ERR_SUCCESS;
 	});
@@ -1175,7 +1165,7 @@ int rtcGetTrackPayloadTypesForCodec(int tr, const char *ccodec, int *buffer, int
 		auto codec = lowercased(string(ccodec));
 		auto description = track->description();
 		std::vector<int> payloadTypes;
-		for(int pt : description.payloadTypes())
+		for (int pt : description.payloadTypes())
 			if (lowercased(description.rtpMap(pt)->format) == codec)
 				payloadTypes.push_back(pt);
 
@@ -1276,13 +1266,8 @@ int rtcCreateWebSocketEx(const char *url, const rtcWsConfiguration *config) {
 int rtcDeleteWebSocket(int ws) {
 	return wrap([&] {
 		auto webSocket = getWebSocket(ws);
-		webSocket->onOpen(nullptr);
-		webSocket->onClosed(nullptr);
-		webSocket->onError(nullptr);
-		webSocket->onMessage(nullptr);
-		webSocket->onBufferedAmountLow(nullptr);
-		webSocket->onAvailable(nullptr);
-
+		webSocket->forceClose();
+		webSocket->resetCallbacks(); // not done on close by WebSocket
 		eraseWebSocket(ws);
 		return RTC_ERR_SUCCESS;
 	});
@@ -1345,7 +1330,6 @@ RTC_EXPORT int rtcDeleteWebSocketServer(int wsserver) {
 		auto webSocketServer = getWebSocketServer(wsserver);
 		webSocketServer->onClient(nullptr);
 		webSocketServer->stop();
-
 		eraseWebSocketServer(wsserver);
 		return RTC_ERR_SUCCESS;
 	});
@@ -1371,12 +1355,13 @@ void rtcPreload() {
 void rtcCleanup() {
 	try {
 		size_t count = eraseAll();
-		if(count != 0) {
+		if (count != 0) {
 			PLOG_INFO << count << " objects were not properly destroyed before cleanup";
 		}
 
-		if(rtc::Cleanup().wait_for(10s) == std::future_status::timeout)
-			throw std::runtime_error("Cleanup timeout (possible deadlock or undestructible object)");
+		if (rtc::Cleanup().wait_for(10s) == std::future_status::timeout)
+			throw std::runtime_error(
+			    "Cleanup timeout (possible deadlock or undestructible object)");
 
 	} catch (const std::exception &e) {
 		PLOG_ERROR << e.what();
@@ -1432,4 +1417,3 @@ int rtcSetSctpSettings(const rtcSctpSettings *settings) {
 		return RTC_ERR_SUCCESS;
 	});
 }
-

+ 4 - 0
src/channel.cpp

@@ -59,6 +59,10 @@ void Channel::setBufferedAmountLowThreshold(size_t amount) {
 	impl()->bufferedAmountLowThreshold = amount;
 }
 
+void Channel::resetCallbacks() {
+	impl()->resetCallbacks();
+}
+
 optional<message_variant> Channel::receive() { return impl()->receive(); }
 
 optional<message_variant> Channel::peek() { return impl()->peek(); }

+ 5 - 4
src/impl/datachannel.cpp

@@ -100,18 +100,19 @@ void DataChannel::close() {
 		transport = mSctpTransport.lock();
 	}
 
-	mIsClosed = true;
 	if (mIsOpen.exchange(false) && transport)
 		transport->closeStream(mStream);
 
+	if (!mIsClosed.exchange(true))
+		triggerClosed();
+
 	resetCallbacks();
 }
 
 void DataChannel::remoteClose() {
+	mIsOpen = false;
 	if (!mIsClosed.exchange(true))
 		triggerClosed();
-
-	mIsOpen = false;
 }
 
 optional<message_variant> DataChannel::receive() {
@@ -187,7 +188,7 @@ void DataChannel::open(shared_ptr<SctpTransport> transport) {
 		mSctpTransport = transport;
 	}
 
-	if (!mIsOpen.exchange(true))
+	if (!mIsClosed && !mIsOpen.exchange(true))
 		triggerOpen();
 }
 

+ 4 - 2
src/impl/track.cpp

@@ -64,7 +64,8 @@ void Track::setDescription(Description::Media description) {
 void Track::close() {
 	PLOG_VERBOSE << "Closing Track";
 
-	mIsClosed = true;
+	if (!mIsClosed.exchange(true))
+		triggerClosed();
 
 	setMediaHandler(nullptr);
 	resetCallbacks();
@@ -112,7 +113,8 @@ void Track::open(shared_ptr<DtlsSrtpTransport> transport) {
 		mDtlsSrtpTransport = transport;
 	}
 
-	triggerOpen();
+	if (!mIsClosed)
+		triggerOpen();
 }
 #endif
 

+ 4 - 0
src/peerconnection.cpp

@@ -320,6 +320,10 @@ void PeerConnection::onSignalingStateChange(std::function<void(SignalingState st
 	impl()->signalingStateChangeCallback = callback;
 }
 
+void PeerConnection::resetCallbacks() {
+	impl()->resetCallbacks();
+}
+
 bool PeerConnection::getSelectedCandidatePair(Candidate *local, Candidate *remote) {
 	auto iceTransport = impl()->getIceTransport();
 	return iceTransport ? iceTransport->getSelectedCandidatePair(local, remote) : false;

+ 2 - 0
src/websocket.cpp

@@ -60,6 +60,8 @@ void WebSocket::open(const string &url) {
 
 void WebSocket::close() { impl()->close(); }
 
+void WebSocket::forceClose() { impl()->remoteClose(); }
+
 bool WebSocket::send(message_variant data) {
 	return impl()->outgoing(make_message(std::move(data)));
 }

+ 15 - 2
test/capi_track.cpp

@@ -78,6 +78,7 @@ static void RTC_API openCallback(int id, void *ptr) {
 static void RTC_API closedCallback(int id, void *ptr) {
 	Peer *peer = (Peer *)ptr;
 	peer->connected = false;
+	printf("Track %d: Closed\n", peer == peer1 ? 1 : 2);
 }
 
 static void RTC_API trackCallback(int pc, int tr, void *ptr) {
@@ -89,16 +90,25 @@ static void RTC_API trackCallback(int pc, int tr, void *ptr) {
 	char buffer[1024];
 	if (rtcGetTrackDescription(tr, buffer, 1024) >= 0)
 		printf("Track %d: Received with media description: \n%s\n", peer == peer1 ? 1 : 2, buffer);
+	else
+		fprintf(stderr, "rtcGetTrackDescription failed\n");
 }
 
 static Peer *createPeer(const rtcConfiguration *config) {
 	Peer *peer = (Peer *)malloc(sizeof(Peer));
 	if (!peer)
 		return nullptr;
+
 	memset(peer, 0, sizeof(Peer));
 
 	// Create peer connection
 	peer->pc = rtcCreatePeerConnection(config);
+	if (peer->pc < 0) {
+		fprintf(stderr, "PeerConnection creation failed\n");
+		free(peer);
+		return nullptr;
+	}
+
 	rtcSetUserPointer(peer->pc, peer);
 	rtcSetTrackCallback(peer->pc, trackCallback);
 	rtcSetLocalDescriptionCallback(peer->pc, descriptionCallback);
@@ -106,15 +116,18 @@ static Peer *createPeer(const rtcConfiguration *config) {
 	rtcSetStateChangeCallback(peer->pc, stateChangeCallback);
 	rtcSetGatheringStateChangeCallback(peer->pc, gatheringStateCallback);
 
+	peer->tr = -1;
 	return peer;
 }
 
 static void deletePeer(Peer *peer) {
 	if (peer) {
-		if (peer->tr)
+		if (peer->tr >= 0)
 			rtcDeleteTrack(peer->tr);
-		if (peer->pc)
+
+		if (peer->pc >= 0)
 			rtcDeletePeerConnection(peer->pc);
+
 		free(peer);
 	}
 }

+ 14 - 0
test/connectivity.cpp

@@ -112,6 +112,10 @@ void test_connectivity() {
 				dc->send("Hello from 2");
 		});
 
+		dc->onClosed([]() {
+			cout << "DataChannel 2: Closed" << endl;
+		});
+
 		dc->onMessage([](variant<binary, string> message) {
 			if (holds_alternative<string>(message)) {
 				cout << "Message 2: " << get<string>(message) << endl;
@@ -130,6 +134,10 @@ void test_connectivity() {
 		}
 	});
 
+	dc1->onClosed([]() {
+		cout << "DataChannel 1: Closed" << endl;
+	});
+
 	dc1->onMessage([](const variant<binary, string> &message) {
 		if (holds_alternative<string>(message)) {
 			cout << "Message 1: " << get<string>(message) << endl;
@@ -195,12 +203,18 @@ void test_connectivity() {
 	});
 
 	auto second1 = pc1.createDataChannel("second");
+
 	second1->onOpen([wsecond1 = make_weak_ptr(second1)]() {
 		if (auto second1 = wsecond1.lock()) {
 			cout << "Second DataChannel 1: Open" << endl;
 			second1->send("Second hello from 1");
 		}
 	});
+
+	second1->onClosed([]() {
+		cout << "Second DataChannel 1: Closed" << endl;
+	});
+
 	second1->onMessage([](const variant<binary, string> &message) {
 		if (holds_alternative<string>(message)) {
 			cout << "Second Message 1: " << get<string>(message) << endl;

+ 12 - 0
test/turn_connectivity.cpp

@@ -127,6 +127,12 @@ void test_turn_connectivity() {
 		cout << "DataChannel 1: Open" << endl;
 		dc1->send("Hello from 1");
 	});
+
+
+	dc1->onClosed([]() {
+		cout << "DataChannel 1: Closed" << endl;
+	});
+
 	dc1->onMessage([](const variant<binary, string> &message) {
 		if (holds_alternative<string>(message)) {
 			cout << "Message 1: " << get<string>(message) << endl;
@@ -198,6 +204,7 @@ void test_turn_connectivity() {
 	});
 
 	auto second1 = pc1.createDataChannel("second");
+
 	second1->onOpen([wsecond1 = make_weak_ptr(second1)]() {
 		auto second1 = wsecond1.lock();
 		if (!second1)
@@ -206,6 +213,11 @@ void test_turn_connectivity() {
 		cout << "Second DataChannel 1: Open" << endl;
 		second1->send("Second hello from 1");
 	});
+
+	second1->onClosed([]() {
+		cout << "Second DataChannel 1: Closed" << endl;
+	});
+
 	second1->onMessage([](const variant<binary, string> &message) {
 		if (holds_alternative<string>(message)) {
 			cout << "Second Message 1: " << get<string>(message) << endl;