Просмотр исходного кода

Resolves paullouisageneau/libdatachannel#1375

Knock, Bob 4 месяцев назад
Родитель
Сommit
408e5f4d9c
2 измененных файлов с 52 добавлено и 45 удалено
  1. 50 44
      src/impl/pollservice.cpp
  2. 2 1
      src/impl/tcptransport.cpp

+ 50 - 44
src/impl/pollservice.cpp

@@ -105,57 +105,63 @@ void PollService::prepare(std::vector<struct pollfd> &pfds, optional<clock::time
 }
 
 void PollService::process(std::vector<struct pollfd> &pfds) {
-	std::unique_lock lock(mMutex);
-	auto it = pfds.begin();
-	if (it != pfds.end()) {
-		mInterrupter->process(*it++);
-	}
-	while (it != pfds.end()) {
-		socket_t sock = it->fd;
-		auto jt = mSocks->find(sock);
-		if (jt != mSocks->end()) {
-			try {
-				auto &entry = jt->second;
-				const auto &params = entry.params;
-
-				if (it->revents & POLLNVAL || it->revents & POLLERR ||
-				    (it->revents & POLLHUP &&
-				     !(it->events & POLLIN))) { // MacOS sets POLLHUP on connection failure
-					PLOG_VERBOSE << "Poll error event";
-					auto callback = std::move(params.callback);
-					mSocks->erase(sock);
-					callback(Event::Error);
-
-				} else if (it->revents & POLLIN || it->revents & POLLOUT || it->revents & POLLHUP) {
-					entry.until = params.timeout
-					                  ? std::make_optional(clock::now() + *params.timeout)
-					                  : nullopt;
-
-					auto callback = params.callback;
-					if (it->revents & POLLIN ||
-					    it->revents & POLLHUP) { // Windows does not set POLLIN on close
-						PLOG_VERBOSE << "Poll in event";
-						callback(Event::In);
-					}
-					if (it->revents & POLLOUT) {
-						PLOG_VERBOSE << "Poll out event";
-						callback(Event::Out);
+	using Callback = decltype(std::declval<Params>().callback);
+	std::vector<std::pair<Callback, Event>> todo;
+	{
+		std::unique_lock lock(mMutex);
+		auto it = pfds.begin();
+		if (it != pfds.end()) {
+			mInterrupter->process(*it++);
+		}
+		while (it != pfds.end()) {
+			socket_t sock = it->fd;
+			auto jt = mSocks->find(sock);
+			if (jt != mSocks->end()) {
+				try {
+					auto &entry = jt->second;
+					const auto &params = entry.params;
+
+					if (it->revents & POLLNVAL || it->revents & POLLERR ||
+					    (it->revents & POLLHUP &&
+					     !(it->events & POLLIN))) { // MacOS sets POLLHUP on connection failure
+						PLOG_VERBOSE << "Poll error event";
+						todo.emplace_back(std::move(params.callback), Event::Error);
+						mSocks->erase(sock);
+					} else if (it->revents & POLLIN || it->revents & POLLOUT || it->revents & POLLHUP) {
+						entry.until = params.timeout
+						                  ? std::make_optional(clock::now() + *params.timeout)
+						                  : nullopt;
+
+						const auto &callback = params.callback; // can't move, we may need it below
+						if (it->revents & POLLIN ||
+						    it->revents & POLLHUP) { // Windows does not set POLLIN on close
+							PLOG_VERBOSE << "Poll in event";
+							todo.emplace_back(callback, Event::In);
+						}
+						if (it->revents & POLLOUT) {
+							PLOG_VERBOSE << "Poll out event";
+							todo.emplace_back(callback, Event::Out);
+						}
+
+					} else if (entry.until && clock::now() >= *entry.until) {
+						PLOG_VERBOSE << "Poll timeout event";
+						todo.emplace_back(std::move(params.callback), Event::Timeout);
+						mSocks->erase(sock);
 					}
 
-				} else if (entry.until && clock::now() >= *entry.until) {
-					PLOG_VERBOSE << "Poll timeout event";
-					auto callback = std::move(params.callback);
+				} catch (const std::exception &e) {
+					PLOG_WARNING << e.what();
 					mSocks->erase(sock);
-					callback(Event::Timeout);
 				}
-
-			} catch (const std::exception &e) {
-				PLOG_WARNING << e.what();
-				mSocks->erase(sock);
 			}
+
+			++it;
 		}
+	}
 
-		++it;
+	// Now perform the callbacks
+	for (auto &[callback, event] : todo) {
+		callback(event);
 	}
 }
 

+ 2 - 1
src/impl/tcptransport.cpp

@@ -324,9 +324,10 @@ void TcpTransport::configureSocket() {
 }
 
 void TcpTransport::setPoll(PollService::Direction direction) {
+	auto weakSelf = weak_from_this();
 	PollService::Instance().add(
 	    mSock, {direction, direction == PollService::Direction::In ? mReadTimeout : nullopt,
-	            std::bind(&TcpTransport::process, this, _1)});
+	    [weakSelf](PollService::Event event) { if (auto self = weakSelf.lock()) { self->process(event); } } });
 }
 
 void TcpTransport::close() {