|
@@ -51,9 +51,10 @@ void PollService::join() {
|
|
}
|
|
}
|
|
|
|
|
|
void PollService::add(socket_t sock, Params params) {
|
|
void PollService::add(socket_t sock, Params params) {
|
|
- std::unique_lock lock(mMutex);
|
|
|
|
|
|
+ assert(sock != INVALID_SOCKET);
|
|
assert(params.callback);
|
|
assert(params.callback);
|
|
|
|
|
|
|
|
+ std::unique_lock lock(mMutex);
|
|
PLOG_VERBOSE << "Registering socket in poll service, direction=" << params.direction;
|
|
PLOG_VERBOSE << "Registering socket in poll service, direction=" << params.direction;
|
|
auto until = params.timeout ? std::make_optional(clock::now() + *params.timeout) : nullopt;
|
|
auto until = params.timeout ? std::make_optional(clock::now() + *params.timeout) : nullopt;
|
|
assert(mSocks);
|
|
assert(mSocks);
|
|
@@ -64,8 +65,9 @@ void PollService::add(socket_t sock, Params params) {
|
|
}
|
|
}
|
|
|
|
|
|
void PollService::remove(socket_t sock) {
|
|
void PollService::remove(socket_t sock) {
|
|
- std::unique_lock lock(mMutex);
|
|
|
|
|
|
+ assert(sock != INVALID_SOCKET);
|
|
|
|
|
|
|
|
+ std::unique_lock lock(mMutex);
|
|
PLOG_VERBOSE << "Unregistering socket in poll service";
|
|
PLOG_VERBOSE << "Unregistering socket in poll service";
|
|
assert(mSocks);
|
|
assert(mSocks);
|
|
mSocks->erase(sock);
|
|
mSocks->erase(sock);
|
|
@@ -102,11 +104,13 @@ void PollService::prepare(std::vector<struct pollfd> &pfds, optional<clock::time
|
|
}
|
|
}
|
|
|
|
|
|
void PollService::process(std::vector<struct pollfd> &pfds) {
|
|
void PollService::process(std::vector<struct pollfd> &pfds) {
|
|
- std::unique_lock lock(mMutex);
|
|
|
|
-
|
|
|
|
auto it = pfds.begin();
|
|
auto it = pfds.begin();
|
|
- mInterrupter->process(*it++);
|
|
|
|
|
|
+ if (it != pfds.end()) {
|
|
|
|
+ std::unique_lock lock(mMutex);
|
|
|
|
+ mInterrupter->process(*it++);
|
|
|
|
+ }
|
|
while (it != pfds.end()) {
|
|
while (it != pfds.end()) {
|
|
|
|
+ std::unique_lock lock(mMutex);
|
|
socket_t sock = it->fd;
|
|
socket_t sock = it->fd;
|
|
auto jt = mSocks->find(sock);
|
|
auto jt = mSocks->find(sock);
|
|
if (jt != mSocks->end()) {
|
|
if (jt != mSocks->end()) {
|
|
@@ -178,7 +182,10 @@ void PollService::runLoop() {
|
|
ret = ::poll(pfds.data(), static_cast<nfds_t>(pfds.size()), timeout);
|
|
ret = ::poll(pfds.data(), static_cast<nfds_t>(pfds.size()), timeout);
|
|
|
|
|
|
PLOG_VERBOSE << "Exiting poll";
|
|
PLOG_VERBOSE << "Exiting poll";
|
|
-
|
|
|
|
|
|
+#ifdef _WIN32
|
|
|
|
+ if (ret == WSAENOTSOCK)
|
|
|
|
+ continue; // prepare again as the fd has been removed
|
|
|
|
+#endif
|
|
} while (ret < 0 && (sockerrno == SEINTR || sockerrno == SEAGAIN));
|
|
} while (ret < 0 && (sockerrno == SEINTR || sockerrno == SEAGAIN));
|
|
|
|
|
|
if (ret < 0)
|
|
if (ret < 0)
|