Pārlūkot izejas kodu

Introduced synchronized_stored_callback to support callbacks set after they are triggered

Paul-Louis Ageneau 4 gadi atpakaļ
vecāks
revīzija
ae429410fe
3 mainītis faili ar 54 papildinājumiem un 18 dzēšanām
  1. 46 12
      include/rtc/utils.hpp
  2. 2 1
      src/impl/channel.cpp
  3. 6 5
      src/impl/channel.hpp

+ 46 - 12
include/rtc/utils.hpp

@@ -22,6 +22,8 @@
 #include <functional>
 #include <memory>
 #include <mutex>
+#include <optional>
+#include <tuple>
 
 namespace rtc {
 
@@ -58,40 +60,35 @@ private:
 };
 
 // callback with built-in synchronization
-template <typename... Args> class synchronized_callback final {
+template <typename... Args> class synchronized_callback {
 public:
 	synchronized_callback() = default;
 	synchronized_callback(synchronized_callback &&cb) { *this = std::move(cb); }
 	synchronized_callback(const synchronized_callback &cb) { *this = cb; }
 	synchronized_callback(std::function<void(Args...)> func) { *this = std::move(func); }
-	~synchronized_callback() { *this = nullptr; }
+	virtual ~synchronized_callback() { *this = nullptr; }
 
 	synchronized_callback &operator=(synchronized_callback &&cb) {
 		std::scoped_lock lock(mutex, cb.mutex);
-		callback = std::move(cb.callback);
-		cb.callback = nullptr;
+		set(std::exchange(cb.callback, nullptr));
 		return *this;
 	}
 
 	synchronized_callback &operator=(const synchronized_callback &cb) {
 		std::scoped_lock lock(mutex, cb.mutex);
-		callback = cb.callback;
+		set(cb.callback);
 		return *this;
 	}
 
 	synchronized_callback &operator=(std::function<void(Args...)> func) {
 		std::lock_guard lock(mutex);
-		callback = std::move(func);
+		set(std::move(func));
 		return *this;
 	}
 
 	bool operator()(Args... args) const {
 		std::lock_guard lock(mutex);
-		if (!callback)
-			return false;
-
-		callback(std::move(args)...);
-		return true;
+		return call(std::move(args)...);
 	}
 
 	operator bool() const {
@@ -103,11 +100,48 @@ public:
 		return [this](Args... args) { (*this)(std::move(args)...); };
 	}
 
-private:
+protected:
+	virtual void set(std::function<void(Args...)> func) { callback = std::move(func); }
+	virtual bool call(Args... args) const {
+		if (!callback)
+			return false;
+
+		callback(std::move(args)...);
+		return true;
+	}
+
 	std::function<void(Args...)> callback;
 	mutable std::recursive_mutex mutex;
 };
 
+// callback with built-in synchronization and replay of the last missed call
+template <typename... Args>
+class synchronized_stored_callback final : public synchronized_callback<Args...> {
+public:
+	template <typename... CArgs>
+	synchronized_stored_callback(CArgs &&...cargs)
+	    : synchronized_callback<Args...>(std::forward<CArgs>(cargs)...) {}
+	~synchronized_stored_callback() {}
+
+private:
+	void set(std::function<void(Args...)> func) {
+		synchronized_callback<Args...>::set(func);
+		if (func && stored) {
+			std::apply(func, std::move(*stored));
+			stored.reset();
+		}
+	}
+
+	bool call(Args... args) const {
+		if (!synchronized_callback<Args...>::call(args...))
+			stored.emplace(std::move(args)...);
+
+		return true;
+	}
+
+	mutable std::optional<std::tuple<Args...>> stored;
+};
+
 // pimpl base class
 template <typename T> using impl_ptr = std::shared_ptr<T>;
 template <typename T> class CheshireCat {

+ 2 - 1
src/impl/channel.cpp

@@ -63,12 +63,13 @@ void Channel::resetOpenCallback() {
 }
 
 void Channel::resetCallbacks() {
+	mOpenTriggered = false;
 	openCallback = nullptr;
 	closedCallback = nullptr;
 	errorCallback = nullptr;
-	messageCallback = nullptr;
 	availableCallback = nullptr;
 	bufferedAmountLowCallback = nullptr;
+	messageCallback = nullptr;
 }
 
 } // namespace rtc::impl

+ 6 - 5
src/impl/channel.hpp

@@ -42,12 +42,13 @@ struct Channel {
 	void resetOpenCallback();
 	void resetCallbacks();
 
-	synchronized_callback<> openCallback;
-	synchronized_callback<> closedCallback;
-	synchronized_callback<string> errorCallback;
+	synchronized_stored_callback<> openCallback;
+	synchronized_stored_callback<> closedCallback;
+	synchronized_stored_callback<string> errorCallback;
+	synchronized_stored_callback<> availableCallback;
+	synchronized_stored_callback<> bufferedAmountLowCallback;
+
 	synchronized_callback<message_variant> messageCallback;
-	synchronized_callback<> availableCallback;
-	synchronized_callback<> bufferedAmountLowCallback;
 
 	std::atomic<size_t> bufferedAmount = 0;
 	std::atomic<size_t> bufferedAmountLowThreshold = 0;