|
@@ -31,28 +31,34 @@ TcpTransport::TcpTransport(const string &hostname, const string &service, state_
|
|
mThread = std::thread(&TcpTransport::runLoop, this);
|
|
mThread = std::thread(&TcpTransport::runLoop, this);
|
|
}
|
|
}
|
|
|
|
|
|
-TcpTransport::~TcpTransport() { stop(); }
|
|
|
|
|
|
+TcpTransport::~TcpTransport() {
|
|
|
|
+ stop();
|
|
|
|
+ if (mInterruptSock != INVALID_SOCKET)
|
|
|
|
+ ::closesocket(mInterruptSock);
|
|
|
|
+}
|
|
|
|
|
|
bool TcpTransport::stop() {
|
|
bool TcpTransport::stop() {
|
|
if (!Transport::stop())
|
|
if (!Transport::stop())
|
|
return false;
|
|
return false;
|
|
|
|
|
|
|
|
+ PLOG_DEBUG << "Waiting TCP recv thread";
|
|
close();
|
|
close();
|
|
mThread.join();
|
|
mThread.join();
|
|
return true;
|
|
return true;
|
|
}
|
|
}
|
|
|
|
|
|
-bool TcpTransport::send(message_ptr message) { return outgoing(message); }
|
|
|
|
|
|
+bool TcpTransport::send(message_ptr message) {
|
|
|
|
+ if (!message)
|
|
|
|
+ return mSendQueue.empty();
|
|
|
|
|
|
-void TcpTransport::incoming(message_ptr message) { recv(message); }
|
|
|
|
|
|
+ PLOG_VERBOSE << "Send size=" << (message ? message->size() : 0);
|
|
|
|
|
|
-bool TcpTransport::outgoing(message_ptr message) {
|
|
|
|
- if (mSock == INVALID_SOCKET)
|
|
|
|
- throw std::runtime_error("Not connected");
|
|
|
|
|
|
+ return outgoing(message);
|
|
|
|
+}
|
|
|
|
|
|
- if (!message)
|
|
|
|
- return mSendQueue.empty();
|
|
|
|
|
|
+void TcpTransport::incoming(message_ptr message) { recv(message); }
|
|
|
|
|
|
|
|
+bool TcpTransport::outgoing(message_ptr message) {
|
|
// If nothing is pending, try to send directly
|
|
// If nothing is pending, try to send directly
|
|
// It's safe because if the queue is empty, the thread is not sending
|
|
// It's safe because if the queue is empty, the thread is not sending
|
|
if (mSendQueue.empty() && trySendMessage(message))
|
|
if (mSendQueue.empty() && trySendMessage(message))
|
|
@@ -64,6 +70,8 @@ bool TcpTransport::outgoing(message_ptr message) {
|
|
}
|
|
}
|
|
|
|
|
|
void TcpTransport::connect(const string &hostname, const string &service) {
|
|
void TcpTransport::connect(const string &hostname, const string &service) {
|
|
|
|
+ PLOG_DEBUG << "Connecting to " << hostname << ":" << service;
|
|
|
|
+
|
|
struct addrinfo hints = {};
|
|
struct addrinfo hints = {};
|
|
hints.ai_family = AF_UNSPEC;
|
|
hints.ai_family = AF_UNSPEC;
|
|
hints.ai_socktype = SOCK_STREAM;
|
|
hints.ai_socktype = SOCK_STREAM;
|
|
@@ -89,6 +97,8 @@ void TcpTransport::connect(const string &hostname, const string &service) {
|
|
|
|
|
|
void TcpTransport::connect(const sockaddr *addr, socklen_t addrlen) {
|
|
void TcpTransport::connect(const sockaddr *addr, socklen_t addrlen) {
|
|
try {
|
|
try {
|
|
|
|
+ PLOG_DEBUG << "Creating TCP socket";
|
|
|
|
+
|
|
// Create socket
|
|
// Create socket
|
|
mSock = ::socket(addr->sa_family, SOCK_STREAM, IPPROTO_TCP);
|
|
mSock = ::socket(addr->sa_family, SOCK_STREAM, IPPROTO_TCP);
|
|
if (mSock == INVALID_SOCKET)
|
|
if (mSock == INVALID_SOCKET)
|
|
@@ -98,6 +108,15 @@ void TcpTransport::connect(const sockaddr *addr, socklen_t addrlen) {
|
|
if (::ioctlsocket(mSock, FIONBIO, &b) < 0)
|
|
if (::ioctlsocket(mSock, FIONBIO, &b) < 0)
|
|
throw std::runtime_error("Failed to set socket non-blocking mode");
|
|
throw std::runtime_error("Failed to set socket non-blocking mode");
|
|
|
|
|
|
|
|
+ IF_PLOG(plog::debug) {
|
|
|
|
+ char node[MAX_NUMERICNODE_LEN];
|
|
|
|
+ char serv[MAX_NUMERICSERV_LEN];
|
|
|
|
+ if (getnameinfo(addr, addrlen, node, MAX_NUMERICNODE_LEN, serv, MAX_NUMERICSERV_LEN,
|
|
|
|
+ NI_NUMERICHOST | NI_NUMERICSERV) == 0) {
|
|
|
|
+ PLOG_DEBUG << "Trying address " << node << ":" << serv;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
// Initiate connection
|
|
// Initiate connection
|
|
::connect(mSock, addr, addrlen);
|
|
::connect(mSock, addr, addrlen);
|
|
|
|
|
|
@@ -126,6 +145,7 @@ void TcpTransport::connect(const sockaddr *addr, socklen_t addrlen) {
|
|
|
|
|
|
void TcpTransport::close() {
|
|
void TcpTransport::close() {
|
|
if (mSock != INVALID_SOCKET) {
|
|
if (mSock != INVALID_SOCKET) {
|
|
|
|
+ PLOG_DEBUG << "Closing TCP socket";
|
|
::closesocket(mSock);
|
|
::closesocket(mSock);
|
|
mSock = INVALID_SOCKET;
|
|
mSock = INVALID_SOCKET;
|
|
}
|
|
}
|
|
@@ -166,10 +186,9 @@ bool TcpTransport::trySendMessage(message_ptr &message) {
|
|
void TcpTransport::runLoop() {
|
|
void TcpTransport::runLoop() {
|
|
const size_t bufferSize = 4096;
|
|
const size_t bufferSize = 4096;
|
|
|
|
|
|
- changeState(State::Connecting);
|
|
|
|
-
|
|
|
|
// Connect
|
|
// Connect
|
|
try {
|
|
try {
|
|
|
|
+ changeState(State::Connecting);
|
|
connect(mHostname, mService);
|
|
connect(mHostname, mService);
|
|
|
|
|
|
} catch (const std::exception &e) {
|
|
} catch (const std::exception &e) {
|
|
@@ -178,10 +197,12 @@ void TcpTransport::runLoop() {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
- changeState(State::Connected);
|
|
|
|
|
|
|
|
// Receive loop
|
|
// Receive loop
|
|
try {
|
|
try {
|
|
|
|
+ PLOG_INFO << "TCP connected";
|
|
|
|
+ changeState(State::Connected);
|
|
|
|
+
|
|
while (true) {
|
|
while (true) {
|
|
fd_set readfds, writefds;
|
|
fd_set readfds, writefds;
|
|
int n = prepareSelect(readfds, writefds);
|
|
int n = prepareSelect(readfds, writefds);
|