|
@@ -26,8 +26,13 @@
|
|
#include <unistd.h>
|
|
#include <unistd.h>
|
|
#endif
|
|
#endif
|
|
|
|
|
|
|
|
+#include <chrono>
|
|
|
|
+
|
|
namespace rtc::impl {
|
|
namespace rtc::impl {
|
|
|
|
|
|
|
|
+using namespace std::chrono_literals;
|
|
|
|
+using std::chrono::milliseconds;
|
|
|
|
+
|
|
TcpTransport::TcpTransport(string hostname, string service, state_callback callback)
|
|
TcpTransport::TcpTransport(string hostname, string service, state_callback callback)
|
|
: Transport(nullptr, std::move(callback)), mIsActive(true), mHostname(std::move(hostname)),
|
|
: Transport(nullptr, std::move(callback)), mIsActive(true), mHostname(std::move(hostname)),
|
|
mService(std::move(service)) {
|
|
mService(std::move(service)) {
|
|
@@ -192,13 +197,11 @@ void TcpTransport::connect(const sockaddr *addr, socklen_t addrlen) {
|
|
}
|
|
}
|
|
|
|
|
|
while (true) {
|
|
while (true) {
|
|
- fd_set writefds;
|
|
|
|
- FD_ZERO(&writefds);
|
|
|
|
- FD_SET(mSock, &writefds);
|
|
|
|
- struct timeval tv;
|
|
|
|
- tv.tv_sec = 10; // TODO: Make the timeout configurable
|
|
|
|
- tv.tv_usec = 0;
|
|
|
|
- ret = ::select(SOCKET_TO_INT(mSock) + 1, NULL, &writefds, NULL, &tv);
|
|
|
|
|
|
+ struct pollfd pfd[1];
|
|
|
|
+ pfd[0].fd = mSock;
|
|
|
|
+ pfd[0].events = POLLOUT;
|
|
|
|
+ milliseconds timeout = 10s; // TODO: Make the timeout configurable
|
|
|
|
+ int ret = ::poll(pfd, 1, int(timeout.count()));
|
|
|
|
|
|
if (ret < 0) {
|
|
if (ret < 0) {
|
|
if (sockerrno == SEINTR || sockerrno == SEAGAIN) // interrupted
|
|
if (sockerrno == SEINTR || sockerrno == SEAGAIN) // interrupted
|
|
@@ -207,7 +210,11 @@ void TcpTransport::connect(const sockaddr *addr, socklen_t addrlen) {
|
|
throw std::runtime_error("Failed to wait for socket connection");
|
|
throw std::runtime_error("Failed to wait for socket connection");
|
|
}
|
|
}
|
|
|
|
|
|
- if (ret == 0) {
|
|
|
|
|
|
+ if (pfd[0].revents & POLLNVAL || pfd[0].revents & POLLERR) {
|
|
|
|
+ throw std::runtime_error("Error while waiting for socket connection");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (!(pfd[0].revents & POLLOUT)) {
|
|
std::ostringstream msg;
|
|
std::ostringstream msg;
|
|
msg << "TCP connection to " << node << ":" << serv << " timed out";
|
|
msg << "TCP connection to " << node << ":" << serv << " timed out";
|
|
throw std::runtime_error(msg.str());
|
|
throw std::runtime_error(msg.str());
|
|
@@ -310,29 +317,28 @@ void TcpTransport::runLoop() {
|
|
|
|
|
|
while (true) {
|
|
while (true) {
|
|
std::unique_lock lock(mSockMutex);
|
|
std::unique_lock lock(mSockMutex);
|
|
|
|
+
|
|
if (mSock == INVALID_SOCKET)
|
|
if (mSock == INVALID_SOCKET)
|
|
break;
|
|
break;
|
|
|
|
|
|
- fd_set readfds, writefds;
|
|
|
|
- FD_ZERO(&readfds);
|
|
|
|
- FD_ZERO(&writefds);
|
|
|
|
- FD_SET(mSock, &readfds);
|
|
|
|
- if (!mSendQueue.empty())
|
|
|
|
- FD_SET(mSock, &writefds);
|
|
|
|
-
|
|
|
|
- int n = std::max(mInterrupter.prepare(readfds), SOCKET_TO_INT(mSock) + 1);
|
|
|
|
-
|
|
|
|
- struct timeval tv;
|
|
|
|
- tv.tv_sec = 10;
|
|
|
|
- tv.tv_usec = 0;
|
|
|
|
|
|
+ struct pollfd pfd[2];
|
|
|
|
+ pfd[0].fd = mSock;
|
|
|
|
+ pfd[0].events = !mSendQueue.empty() ? (POLLIN | POLLOUT) : POLLIN;
|
|
|
|
+ mInterrupter.prepare(pfd[1]);
|
|
|
|
+ milliseconds timeout = 10s;
|
|
lock.unlock();
|
|
lock.unlock();
|
|
- int ret = ::select(n, &readfds, &writefds, NULL, &tv);
|
|
|
|
|
|
+ int ret = ::poll(pfd, 2, int(timeout.count()));
|
|
lock.lock();
|
|
lock.lock();
|
|
|
|
+
|
|
if (mSock == INVALID_SOCKET)
|
|
if (mSock == INVALID_SOCKET)
|
|
break;
|
|
break;
|
|
|
|
|
|
if (ret < 0) {
|
|
if (ret < 0) {
|
|
- throw std::runtime_error("Failed to wait on socket");
|
|
|
|
|
|
+ if (sockerrno == SEINTR || sockerrno == SEAGAIN) // interrupted
|
|
|
|
+ continue;
|
|
|
|
+ else
|
|
|
|
+ throw std::runtime_error("Failed to wait on socket");
|
|
|
|
+
|
|
} else if (ret == 0) {
|
|
} else if (ret == 0) {
|
|
PLOG_VERBOSE << "TCP is idle";
|
|
PLOG_VERBOSE << "TCP is idle";
|
|
lock.unlock(); // unlock now since the upper layer might send on incoming
|
|
lock.unlock(); // unlock now since the upper layer might send on incoming
|
|
@@ -340,10 +346,15 @@ void TcpTransport::runLoop() {
|
|
continue;
|
|
continue;
|
|
}
|
|
}
|
|
|
|
|
|
- if (FD_ISSET(mSock, &writefds))
|
|
|
|
|
|
+ if (pfd[0].revents & POLLNVAL || pfd[0].revents & POLLERR) {
|
|
|
|
+ throw std::runtime_error("Error while waiting for socket connection");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (pfd[0].revents & POLLOUT) {
|
|
trySendQueue();
|
|
trySendQueue();
|
|
|
|
+ }
|
|
|
|
|
|
- if (FD_ISSET(mSock, &readfds)) {
|
|
|
|
|
|
+ if (pfd[0].revents & POLLIN) {
|
|
char buffer[bufferSize];
|
|
char buffer[bufferSize];
|
|
int len = ::recv(mSock, buffer, bufferSize, 0);
|
|
int len = ::recv(mSock, buffer, bufferSize, 0);
|
|
if (len < 0) {
|
|
if (len < 0) {
|