NetworkServer.cpp 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575
  1. /* Copyright The kNet Project.
  2. Licensed under the Apache License, Version 2.0 (the "License");
  3. you may not use this file except in compliance with the License.
  4. You may obtain a copy of the License at
  5. http://www.apache.org/licenses/LICENSE-2.0
  6. Unless required by applicable law or agreed to in writing, software
  7. distributed under the License is distributed on an "AS IS" BASIS,
  8. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  9. See the License for the specific language governing permissions and
  10. limitations under the License. */
  11. /** @file NetworkServer.cpp
  12. @brief */
  13. #ifdef KNET_USE_BOOST
  14. #include <boost/thread/thread.hpp>
  15. #endif
  16. #ifdef KNET_UNIX
  17. #include <unistd.h>
  18. #endif
  19. #include "kNet/DebugMemoryLeakCheck.h"
  20. #include "kNet/Network.h"
  21. #include "kNet/NetworkServer.h"
  22. #include "kNet/TCPMessageConnection.h"
  23. #include "kNet/UDPMessageConnection.h"
  24. #include "kNet/Datagram.h"
  25. #include "kNet/NetworkWorkerThread.h"
  26. #include "kNet/NetworkLogging.h"
  27. #include "kNet/Clock.h"
  28. #include <iostream>
  29. #include <sstream>
  30. namespace kNet
  31. {
  32. NetworkServer::NetworkServer(Network *owner_, std::vector<Socket *> listenSockets_)
  33. :listenSockets(listenSockets_),
  34. owner(owner_),
  35. workerThread(0),
  36. #ifdef KNET_THREAD_CHECKING_ENABLED
  37. workerThreadId(Thread::NullThreadId()),
  38. #endif
  39. acceptNewConnections(true),
  40. networkServerListener(0),
  41. udpConnectionAttempts(64)
  42. {
  43. assert(owner);
  44. assert(!listenSockets.empty());
  45. }
  46. NetworkServer::~NetworkServer()
  47. {
  48. KNET_LOG(LogObjectAlloc, "Deleting NetworkServer %p.", this);
  49. CloseSockets();
  50. }
  51. void NetworkServer::RegisterServerListener(INetworkServerListener *listener)
  52. {
  53. networkServerListener = listener;
  54. }
  55. void NetworkServer::SetAcceptNewConnections(bool acceptNewConnections_)
  56. {
  57. acceptNewConnections = acceptNewConnections_;
  58. }
  59. void NetworkServer::SetWorkerThread(NetworkWorkerThread *thread) // [main thread]
  60. {
  61. workerThread = thread;
  62. #ifdef KNET_THREAD_CHECKING_ENABLED
  63. workerThreadId = thread ? thread->ThreadObject().Id() : Thread::NullThreadId();
  64. #endif
  65. }
  66. void NetworkServer::CloseSockets()
  67. {
  68. KNET_LOG(LogInfo, "NetworkServer::CloseSockets(): Network server is terminated. (%p)", this);
  69. assert(owner);
  70. acceptNewConnections = false;
  71. for(size_t i = 0; i < listenSockets.size(); ++i)
  72. owner->DeleteSocket(listenSockets[i]);
  73. // Now forget all sockets - not getting them back in any way.
  74. listenSockets.clear();
  75. }
  76. Socket *NetworkServer::AcceptConnections(Socket *listenSocket)
  77. {
  78. if (!listenSocket || !listenSocket->Connected())
  79. return 0;
  80. sockaddr_in remoteAddress;
  81. memset(&remoteAddress, 0, sizeof(remoteAddress));
  82. socklen_t remoteAddressLen = sizeof(remoteAddress);
  83. SOCKET &listenSock = listenSocket->GetSocketHandle();
  84. SOCKET acceptSocket = accept(listenSock, (sockaddr*)&remoteAddress, &remoteAddressLen);
  85. if (acceptSocket == KNET_ACCEPT_FAILURE)
  86. {
  87. int error = Network::GetLastError();
  88. if (error != KNET_EWOULDBLOCK)
  89. {
  90. KNET_LOG(LogError, "NetworkServer::AcceptConnections: accept failed: %s", Network::GetErrorString(error).c_str());
  91. closesocket(listenSock);
  92. listenSock = INVALID_SOCKET;
  93. }
  94. return 0;
  95. }
  96. EndPoint remoteEndPoint = EndPoint::FromSockAddrIn(remoteAddress);
  97. std::string remoteHostName = remoteEndPoint.IPToString();
  98. KNET_LOG(LogInfo, "Accepted incoming TCP connection from %s:%d.", remoteHostName.c_str(), (int)remoteEndPoint.port);
  99. EndPoint localEndPoint;
  100. sockaddr_in localSockAddr;
  101. socklen_t namelen = sizeof(localSockAddr);
  102. int sockRet = getsockname(acceptSocket, (sockaddr*)&localSockAddr, &namelen); // Note: This works only if family==INETv4
  103. if (sockRet != 0)
  104. KNET_LOG(LogError, "getsockname failed!");
  105. localEndPoint = EndPoint::FromSockAddrIn(localSockAddr);
  106. std::string localHostName = owner->LocalAddress();
  107. const size_t maxTcpSendSize = 65536;
  108. Socket *socket = owner->StoreSocket(Socket(acceptSocket, localEndPoint, localHostName.c_str(), remoteEndPoint, remoteHostName.c_str(), SocketOverTCP, ServerClientSocket, maxTcpSendSize));
  109. socket->SetBlocking(false);
  110. return socket;
  111. }
  112. void NetworkServer::CleanupDeadConnections()
  113. {
  114. // When we acquire the list of client connections, don't hold on to it for long, so that the worker thread can properly detach from it.
  115. ConnectionMap clientsMap = *clients.Acquire();
  116. // Clean up all disconnected/timed out connections.
  117. ConnectionMap::iterator iter = clientsMap.begin();
  118. while(iter != clientsMap.end())
  119. {
  120. ConnectionMap::iterator next = iter;
  121. ++next;
  122. if (!iter->second->Connected())
  123. {
  124. KNET_LOG(LogInfo, "Client %s disconnected.", iter->second->ToString().c_str());
  125. if (networkServerListener)
  126. networkServerListener->ClientDisconnected(iter->second);
  127. if (iter->second->GetSocket() && iter->second->GetSocket()->TransportLayer() == SocketOverTCP)
  128. owner->CloseConnection(iter->second);
  129. {
  130. Lockable<ConnectionMap>::LockType clientsLock = clients.Acquire();
  131. clientsLock->erase(iter->first);
  132. }
  133. }
  134. iter = next;
  135. }
  136. }
  137. void NetworkServer::Process()
  138. {
  139. CleanupDeadConnections();
  140. for(size_t i = 0; i < listenSockets.size(); ++i)
  141. {
  142. Socket *listen = listenSockets[i];
  143. if (listen->TransportLayer() == SocketOverTCP)
  144. {
  145. // Accept the first inbound connection.
  146. Socket *client = AcceptConnections(listen);
  147. if (client)
  148. {
  149. if (!client->Connected())
  150. KNET_LOG(LogError, "Warning: Accepted an already closed connection!");
  151. KNET_LOG(LogInfo, "Client connected from %s.", client->ToString().c_str());
  152. // Build a MessageConnection on top of the raw socket.
  153. assert(listen->TransportLayer() == SocketOverTCP);
  154. Ptr(MessageConnection) clientConnection = new TCPMessageConnection(owner, this, client, ConnectionOK);
  155. assert(owner);
  156. owner->AssignConnectionToWorkerThread(clientConnection);
  157. if (networkServerListener)
  158. networkServerListener->NewConnectionEstablished(clientConnection);
  159. {
  160. PolledTimer timer;
  161. Lockable<ConnectionMap>::LockType clientsLock = clients.Acquire();
  162. (*clientsLock)[clientConnection->RemoteEndPoint()] = clientConnection;
  163. KNET_LOG(LogWaits, "NetworkServer::Process: Adding new accepted TCP connection to connection list took %f msecs.",
  164. timer.MSecsElapsed());
  165. }
  166. owner->NewMessageConnectionCreated(clientConnection);
  167. }
  168. }
  169. }
  170. // Note that the above loop will only accept one new connection/socket/iteration, so if there are multiple
  171. // pending new connections, they will only get accepted at a rate of one per each frame.
  172. // Process a new UDP connection attempt.
  173. ConnectionAttemptDescriptor *desc = udpConnectionAttempts.Front();
  174. if (desc)
  175. {
  176. ProcessNewUDPConnectionAttempt(desc->listenSocket, desc->peer, (const char *)desc->data.data, desc->data.size);
  177. udpConnectionAttempts.PopFront();
  178. }
  179. // Process all new inbound data for each connection handled by this server.
  180. ConnectionMap clientMap = *clients.Acquire();
  181. for(ConnectionMap::iterator iter = clientMap.begin(); iter != clientMap.end(); ++iter)
  182. iter->second->Process();
  183. }
  184. void NetworkServer::ReadUDPSocketData(Socket *listenSocket) // [worker thread]
  185. {
  186. using namespace std;
  187. assert(listenSocket);
  188. OverlappedTransferBuffer *recvData = listenSocket->BeginReceive();
  189. if (!recvData)
  190. return; // No datagram available, return.
  191. if (recvData->bytesContains == 0)
  192. {
  193. listenSocket->EndReceive(recvData);
  194. KNET_LOG(LogError, "Received 0 bytes of data in NetworkServer::ReadUDPSocketData!");
  195. return;
  196. }
  197. EndPoint endPoint = EndPoint::FromSockAddrIn(recvData->from); // This conversion is quite silly, perhaps it could be removed to gain performance?
  198. KNET_LOG(LogData, "Received a datagram of size %d to socket %s from endPoint %s.", recvData->bytesContains, listenSocket->ToString().c_str(),
  199. endPoint.ToString().c_str());
  200. PolledTimer timer;
  201. MessageConnection *receiverConnection = 0;
  202. {
  203. Lockable<ConnectionMap>::LockType clientsLock = clients.Acquire();
  204. if (timer.MSecsElapsed() > 50.f)
  205. {
  206. KNET_LOG(LogWaits, "NetworkServer::ReadUDPSocketData: Accessing the connection list in UDP server receive code took %f msecs.",
  207. timer.MSecsElapsed());
  208. }
  209. ConnectionMap::iterator iter = clientsLock->find(endPoint); ///\todo HashTable for performance.
  210. if (iter != clientsLock->end())
  211. receiverConnection = iter->second;
  212. }
  213. if (receiverConnection)
  214. {
  215. // If the datagram came from a known endpoint, pass it to the connection object that handles that endpoint.
  216. UDPMessageConnection *udpConnection = dynamic_cast<UDPMessageConnection *>(receiverConnection);
  217. if (udpConnection)
  218. udpConnection->QueueInboundDatagram(recvData->buffer.buf, recvData->bytesContains);
  219. else
  220. KNET_LOG(LogError, "Critical! UDP socket data received into a TCP socket!");
  221. }
  222. else
  223. {
  224. // The endpoint for this datagram is not known, deserialize it as a new connection attempt packet.
  225. EnqueueNewUDPConnectionAttempt(listenSocket, endPoint, recvData->buffer.buf, recvData->bytesContains);
  226. }
  227. listenSocket->EndReceive(recvData);
  228. }
  229. void NetworkServer::EnqueueNewUDPConnectionAttempt(Socket *listenSocket, const EndPoint &endPoint, const char *data, size_t numBytes)
  230. {
  231. ConnectionAttemptDescriptor desc;
  232. desc.data.size = std::min<int>(cDatagramBufferSize, numBytes);
  233. memcpy(&desc.data.data[0], data, desc.data.size);
  234. desc.peer = endPoint;
  235. desc.listenSocket = listenSocket;
  236. ///\todo Check IP banlist.
  237. ///\todo Check that the maximum number of active concurrent connections is not exceeded.
  238. bool success = udpConnectionAttempts.Insert(desc);
  239. if (!success)
  240. KNET_LOG(LogError, "Too many connection attempts!");
  241. else
  242. KNET_LOG(LogInfo, "Queued new connection attempt from %s.", endPoint.ToString().c_str());
  243. }
  244. bool NetworkServer::ProcessNewUDPConnectionAttempt(Socket *listenSocket, const EndPoint &endPoint, const char *data, size_t numBytes)
  245. {
  246. KNET_LOG(LogInfo, "New inbound connection attempt from %s with datagram of size %d.", endPoint.ToString().c_str(), (int)numBytes);
  247. if (!acceptNewConnections)
  248. {
  249. KNET_LOG(LogError, "Ignored a new connection attempt since server is set not to accept new connections.");
  250. return false;
  251. }
  252. // Pass the datagram contents to a callback that decides whether this connection is allowed.
  253. if (networkServerListener)
  254. {
  255. bool connectionAccepted = networkServerListener->NewConnectionAttempt(endPoint, data, numBytes);
  256. if (!connectionAccepted)
  257. {
  258. KNET_LOG(LogError, "Server listener did not accept the new connection.");
  259. return false;
  260. }
  261. }
  262. ///\todo Check IP banlist.
  263. ///\todo Check that the maximum number of active concurrent connections is not exceeded.
  264. std::string remoteHostName = endPoint.IPToString();
  265. // Accept the connection and create a new UDP socket that communicates to that endpoint.
  266. Socket *socket = owner->CreateUDPSlaveSocket(listenSocket, endPoint, remoteHostName.c_str());
  267. if (!socket)
  268. {
  269. KNET_LOG(LogError, "Network::ConnectUDP failed! Cannot accept new UDP connection.");
  270. return false;
  271. }
  272. UDPMessageConnection *udpConnection = new UDPMessageConnection(owner, this, socket, ConnectionOK);
  273. Ptr(MessageConnection) connection(udpConnection);
  274. {
  275. PolledTimer timer;
  276. Lockable<ConnectionMap>::LockType clientsLock = clients.Acquire();
  277. if (clientsLock->find(endPoint) == clientsLock->end())
  278. (*clientsLock)[endPoint] = connection;
  279. else
  280. KNET_LOG(LogError, "NetworkServer::ProcessNewUDPConnectionAttempt: Trying to overwrite an old connection with a new one! Discarding connection attempt datagram!",
  281. timer.MSecsElapsed());
  282. KNET_LOG(LogWaits, "NetworkServer::ProcessNewUDPConnectionAttempt: Accessing the connection list took %f msecs.",
  283. timer.MSecsElapsed());
  284. }
  285. // Pass the MessageConnection to the main application so it can hook the inbound packet stream.
  286. if (networkServerListener)
  287. networkServerListener->NewConnectionEstablished(connection);
  288. connection->SendPingRequestMessage(false);
  289. owner->AssignConnectionToWorkerThread(connection);
  290. owner->NewMessageConnectionCreated(connection);
  291. KNET_LOG(LogInfo, "Accepted new UDP connection.");
  292. return true;
  293. }
  294. void NetworkServer::BroadcastMessage(const NetworkMessage &msg, MessageConnection *exclude)
  295. {
  296. PolledTimer timer;
  297. Lockable<ConnectionMap>::LockType clientsLock = clients.Acquire();
  298. if (timer.MSecsElapsed() >= 50.f)
  299. {
  300. KNET_LOG(LogWaits, "NetworkServer::BroadcastMessage: Accessing the connection list took %f msecs.",
  301. timer.MSecsElapsed());
  302. }
  303. for(ConnectionMap::iterator iter = clientsLock->begin(); iter != clientsLock->end(); ++iter)
  304. {
  305. MessageConnection *connection = iter->second;
  306. if (connection == exclude)
  307. continue;
  308. SendMessage(msg, *connection);
  309. }
  310. }
  311. void NetworkServer::BroadcastMessage(unsigned long id, bool reliable, bool inOrder, unsigned long priority,
  312. unsigned long contentID, const char *data, size_t numBytes,
  313. MessageConnection *exclude)
  314. {
  315. PolledTimer timer;
  316. Lockable<ConnectionMap>::LockType clientsLock = clients.Acquire();
  317. if (timer.MSecsElapsed() >= 50.f)
  318. {
  319. KNET_LOG(LogWaits, "NetworkServer::BroadcastMessage: Accessing the connection list took %f msecs.",
  320. timer.MSecsElapsed());
  321. }
  322. for(ConnectionMap::iterator iter = clientsLock->begin(); iter != clientsLock->end(); ++iter)
  323. {
  324. MessageConnection *connection = iter->second;
  325. assert(connection);
  326. if (connection == exclude || !connection->IsWriteOpen())
  327. continue;
  328. NetworkMessage *msg = connection->StartNewMessage(id, numBytes);
  329. msg->reliable = reliable;
  330. msg->inOrder = inOrder;
  331. msg->priority = priority;
  332. msg->contentID = contentID;
  333. assert(msg->data);
  334. assert(msg->Size() == numBytes);
  335. memcpy(msg->data, data, numBytes);
  336. connection->EndAndQueueMessage(msg);
  337. }
  338. }
  339. void NetworkServer::SendMessage(const NetworkMessage &msg, MessageConnection &destination)
  340. {
  341. if (!destination.IsWriteOpen())
  342. return;
  343. NetworkMessage *cloned = destination.StartNewMessage(msg.id);
  344. *cloned = msg;
  345. destination.EndAndQueueMessage(cloned);
  346. }
  347. void NetworkServer::DisconnectAllClients()
  348. {
  349. SetAcceptNewConnections(false);
  350. PolledTimer timer;
  351. Lockable<ConnectionMap>::LockType clientsLock = clients.Acquire();
  352. KNET_LOG(LogWaits, "NetworkServer::DisconnectAllClients: Accessing the connection list took %f msecs.",
  353. timer.MSecsElapsed());
  354. for(ConnectionMap::iterator iter = clientsLock->begin(); iter != clientsLock->end(); ++iter)
  355. iter->second->Disconnect(0); // Do not wait for any client.
  356. }
  357. void NetworkServer::Close(int disconnectWaitMilliseconds)
  358. {
  359. DisconnectAllClients();
  360. ///\todo Re-implement this function to remove the monolithic Sleep here. Instead of this,
  361. /// wait for the individual connections to finish.
  362. if (GetConnections().size() > 0)
  363. {
  364. Clock::Sleep(disconnectWaitMilliseconds);
  365. KNET_LOG(LogVerbose, "NetworkServer::Close: Waited a fixed period of %d msecs for all connections to disconnect.",
  366. disconnectWaitMilliseconds);
  367. }
  368. PolledTimer timer;
  369. Lockable<ConnectionMap>::LockType clientsLock = clients.Acquire();
  370. KNET_LOG(LogWaits, "NetworkServer::Close: Accessing the connection list took %f msecs.",
  371. timer.MSecsElapsed());
  372. for(ConnectionMap::iterator iter = clientsLock->begin(); iter != clientsLock->end(); ++iter)
  373. iter->second->Close(0); // Do not wait for any client.
  374. }
  375. void NetworkServer::RunModalServer()
  376. {
  377. assert(this);
  378. ///\todo Loop until StopModalServer() is called.
  379. for(;;)
  380. {
  381. Process();
  382. ///\todo WSACreateEvent/WSAWaitForMultipleEvents for improved responsiveness and performance.
  383. Clock::Sleep(1);
  384. }
  385. }
  386. void NetworkServer::ConnectionClosed(MessageConnection *connection)
  387. {
  388. PolledTimer timer;
  389. Lockable<ConnectionMap>::LockType clientsLock = clients.Acquire();
  390. KNET_LOG(LogWaits, "NetworkServer::ConnectionClosed: Accessing the connection list took %f msecs.",
  391. timer.MSecsElapsed());
  392. for(ConnectionMap::iterator iter = clientsLock->begin(); iter != clientsLock->end(); ++iter)
  393. if (iter->second == connection)
  394. {
  395. if (networkServerListener)
  396. networkServerListener->ClientDisconnected(connection);
  397. if (connection->GetSocket() && connection->GetSocket()->TransportLayer() == SocketOverTCP)
  398. {
  399. owner->DeleteSocket(connection->socket);
  400. connection->socket = 0;
  401. }
  402. clientsLock->erase(iter);
  403. return;
  404. }
  405. KNET_LOG(LogError, "Unknown MessageConnection passed to NetworkServer::Disconnect!");
  406. }
  407. std::vector<Socket *> &NetworkServer::ListenSockets()
  408. {
  409. return listenSockets;
  410. }
  411. NetworkServer::ConnectionMap NetworkServer::GetConnections()
  412. {
  413. PolledTimer timer;
  414. Lockable<ConnectionMap>::LockType lock = clients.Acquire();
  415. if (timer.MSecsElapsed() > 50.f)
  416. {
  417. KNET_LOG(LogWaits, "NetworkServer::GetConnections: Accessing the connection list took %f msecs.",
  418. timer.MSecsElapsed());
  419. }
  420. return *lock;
  421. }
  422. int NetworkServer::NumConnections() const
  423. {
  424. int numConnections = 0;
  425. Lockable<ConnectionMap>::ConstLockType lock = clients.Acquire();
  426. for(ConnectionMap::const_iterator iter = lock->begin(); iter != lock->end(); ++iter)
  427. {
  428. const MessageConnection *connection = iter->second.ptr();
  429. if (connection && (connection->IsPending() || connection->IsReadOpen() || connection->IsWriteOpen()))
  430. ++numConnections;
  431. }
  432. return numConnections;
  433. }
  434. std::string NetworkServer::ToString() const
  435. {
  436. bool isUdp = false;
  437. bool isTcp = false;
  438. for(size_t i = 0; i < listenSockets.size(); ++i)
  439. if (listenSockets[i]->TransportLayer() == SocketOverUDP)
  440. isUdp = true;
  441. else
  442. isTcp = true;
  443. std::stringstream ss;
  444. if (isUdp && isTcp)
  445. ss << "TCP+UDP server";
  446. else if (isUdp)
  447. ss << "UDP server";
  448. else if (isTcp)
  449. ss << "TCP server";
  450. else ss << "Server (no listen sockets open)";
  451. if (listenSockets.size() == 1)
  452. {
  453. int port = (int)listenSockets[0]->LocalPort();
  454. ss << " at local port " << port;
  455. }
  456. else if (listenSockets.size() > 1)
  457. {
  458. ss << " (" << (int)listenSockets.size() << " listen sockets at local ports ";
  459. for(size_t i = 0; i < listenSockets.size() && i < 3; ++i)
  460. {
  461. if (i > 0)
  462. ss << ", ";
  463. ss << listenSockets[i]->LocalPort();
  464. }
  465. if (listenSockets.size() > 3)
  466. ss << ", ...";
  467. ss << ")";
  468. }
  469. ss << ": ";
  470. int numConnections = 0;
  471. {
  472. Lockable<ConnectionMap>::ConstLockType lock = clients.Acquire();
  473. numConnections = lock->size();
  474. }
  475. ss << numConnections << " connections.";
  476. if (!acceptNewConnections)
  477. ss << " (not accepting new connections)";
  478. ///\todo Add note about stealth mode.
  479. return ss.str();
  480. }
  481. } // ~kNet