MessageConnection.cpp 45 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342
  1. /* Copyright The kNet Project.
  2. Licensed under the Apache License, Version 2.0 (the "License");
  3. you may not use this file except in compliance with the License.
  4. You may obtain a copy of the License at
  5. http://www.apache.org/licenses/LICENSE-2.0
  6. Unless required by applicable law or agreed to in writing, software
  7. distributed under the License is distributed on an "AS IS" BASIS,
  8. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  9. See the License for the specific language governing permissions and
  10. limitations under the License. */
  11. /** @file MessageConnection.cpp
  12. @brief */
  13. #include <algorithm>
  14. #include <iostream>
  15. #include <cassert>
  16. #ifdef KNET_USE_BOOST
  17. #include <boost/thread/thread.hpp>
  18. #endif
  19. #include "kNet/Allocator.h"
  20. #include "kNet/DebugMemoryLeakCheck.h"
  21. #include "kNet/MessageConnection.h"
  22. #include "kNet/PolledTimer.h"
  23. #include "kNet/Sort.h"
  24. #include "kNet/BitOps.h"
  25. #include "kNet/Network.h"
  26. #include "kNet/NetworkLogging.h"
  27. #include "kNet/DataSerializer.h"
  28. #include "kNet/DataDeserializer.h"
  29. #include "kNet/VLEPacker.h"
  30. #include "kNet/FragmentedTransferManager.h"
  31. #include "kNet/NetworkServer.h"
  32. #include "kNet/Clock.h"
  33. #include "kNet/NetworkWorkerThread.h"
  34. using namespace std;
  35. namespace
  36. {
  37. /// The interval at which we send ping messages.
  38. ///\todo Make this user-defineable.
  39. const float pingIntervalMSecs = 3.5 * 1000.f;
  40. /// The interval at which we update the internal statistics fields.
  41. const float statsRefreshIntervalMSecs = 1000.f;
  42. /// The time interval after which, if we don't get a response to a PingRequest message, the connection is declared lost.
  43. ///\todo Make this user-defineable.
  44. const float connectionLostTimeout = 15.f * 1000.f;
  45. const float cConnectTimeOutMSecs = 15 * 1000.f; ///< \todo Actually use this time limit.
  46. const float cDisconnectTimeOutMSecs = 5 * 1000.f; ///< \todo Actually use this time limit.
  47. }
  48. namespace kNet
  49. {
  50. void AppendU8ToVector(std::vector<char> &data, unsigned long value)
  51. {
  52. data.insert(data.end(), (const char *)&value, (const char *)&value + 1);
  53. }
  54. void AppendU32ToVector(std::vector<char> &data, unsigned long value)
  55. {
  56. data.insert(data.end(), (const char *)&value, (const char *)&value + 4);
  57. }
  58. std::string ConnectionStateToString(ConnectionState state)
  59. {
  60. switch(state)
  61. {
  62. case ConnectionPending: return "ConnectionPending";
  63. case ConnectionOK: return "ConnectionOK";
  64. case ConnectionDisconnecting: return "ConnectionDisconnecting";
  65. case ConnectionPeerClosed: return "ConnectionPeerClosed";
  66. case ConnectionClosed: return "ConnectionClosed";
  67. default: assert(false); return "(Unknown connection state)";
  68. }
  69. }
  70. MessageConnection::MessageConnection(Network *owner_, NetworkServer *ownerServer_, Socket *socket_, ConnectionState startingState)
  71. :owner(owner_), ownerServer(ownerServer_), workerThread(0),
  72. #ifdef KNET_THREAD_CHECKING_ENABLED
  73. workerThreadId(Thread::NullThreadId()),
  74. #endif
  75. outboundAcceptQueue(16*1024), inboundMessageQueue(16*1024),
  76. #ifdef KNET_NO_MAXHEAP
  77. outboundQueue(16 * 1024),
  78. #endif
  79. inboundMessageHandler(0), socket(socket_),
  80. bOutboundSendsPaused(false),
  81. rtt(0.f),
  82. lastHeardTime(Clock::Tick()),
  83. packetsInPerSec(0), packetsOutPerSec(0),
  84. msgsInPerSec(0), msgsOutPerSec(0),
  85. bytesInPerSec(0), bytesOutPerSec(0),
  86. bytesInTotal(0), bytesOutTotal(0),
  87. outboundMessageNumberCounter(0),
  88. outboundReliableMessageNumberCounter(0)
  89. {
  90. connectionState = startingState;
  91. networkSendSimulator.owner = this;
  92. eventMsgsOutAvailable = CreateNewEvent(EventWaitSignal);
  93. assert(eventMsgsOutAvailable.IsValid());
  94. }
  95. MessageConnection::~MessageConnection()
  96. {
  97. KNET_LOG(LogObjectAlloc, "Deleting MessageConnection %p.", this);
  98. // This MessageConnection must have been detached from its owners before deleting it.
  99. // (owner->CloseConnection must have been called)
  100. // We can't have a worker thread referencing to this connection any more, since it would
  101. // be accessing a dangling pointer.
  102. assert(owner == 0);
  103. assert(ownerServer == 0);
  104. assert(workerThread == 0);
  105. FreeMessageData();
  106. eventMsgsOutAvailable.Close();
  107. }
  108. ConnectionState MessageConnection::GetConnectionState() const
  109. {
  110. // If we have now low-level socket at all, we have been already deinitialized.
  111. if (!socket)
  112. return ConnectionClosed;
  113. // If the connection is still pending, socket is not read or write open, but we should not think
  114. // the connection is closed though.
  115. if (connectionState == ConnectionPending)
  116. return ConnectionPending;
  117. if (!socket->IsReadOpen() && !socket->IsWriteOpen())
  118. return ConnectionClosed;
  119. if (!socket->IsReadOpen())
  120. return ConnectionPeerClosed;
  121. if (!socket->IsWriteOpen())
  122. return ConnectionDisconnecting;
  123. return connectionState;
  124. }
  125. bool MessageConnection::IsReadOpen() const
  126. {
  127. if (NumInboundMessagesPending() > 0) // We are always read-open if there are any messages pending to be read.
  128. return true;
  129. if (socket && socket->IsOverlappedReceiveReady()) // If the socket is physically open, we are read-open.
  130. return true;
  131. if (!socket || connectionState == ConnectionPeerClosed || connectionState == ConnectionClosed) // Check against the main connectionState variable.
  132. return false;
  133. return true;
  134. }
  135. bool MessageConnection::IsWriteOpen() const
  136. {
  137. return socket && socket->IsWriteOpen() &&
  138. GetConnectionState() != ConnectionDisconnecting && GetConnectionState() != ConnectionClosed;
  139. }
  140. bool MessageConnection::IsPending() const
  141. {
  142. if (!socket)
  143. return false;
  144. return GetConnectionState() == ConnectionPending;
  145. }
  146. void MessageConnection::RunModalClient()
  147. {
  148. AssertInMainThreadContext();
  149. while(GetConnectionState() != ConnectionClosed)
  150. {
  151. Process();
  152. ///\todo WSACreateEvent/WSAWaitForMultipleEvents for improved responsiveness and performance.
  153. Clock::Sleep(10);
  154. }
  155. }
  156. bool MessageConnection::WaitToEstablishConnection(int maxMSecsToWait)
  157. {
  158. AssertInMainThreadContext();
  159. if (!IsPending())
  160. return Connected();
  161. if (GetConnectionState() != ConnectionPending)
  162. return false;
  163. PolledTimer timer((float)maxMSecsToWait);
  164. while(GetConnectionState() == ConnectionPending && !timer.Test())
  165. Clock::Sleep(1); ///\todo Instead of waiting multiple 1msec slices, should wait for proper event.
  166. KNET_LOG(LogWaits, "MessageConnection::WaitToEstablishConnection: Waited %f msecs for connection. Result: %s.",
  167. timer.MSecsElapsed(), ConnectionStateToString(GetConnectionState()).c_str());
  168. return GetConnectionState() == ConnectionOK;
  169. }
  170. void MessageConnection::Disconnect(int maxMSecsToWait)
  171. {
  172. AssertInMainThreadContext();
  173. if (!socket || !socket->IsWriteOpen())
  174. return;
  175. if (connectionState == ConnectionClosed || connectionState == ConnectionDisconnecting)
  176. return;
  177. KNET_LOG(LogInfo, "MessageConnection::Disconnect(%d msecs): Write-closing connection. connectionState = %s, socket readOpen:%s, socket writeOpen:%s.",
  178. maxMSecsToWait, ConnectionStateToString(connectionState).c_str(), socket->IsReadOpen() ? "true":"false",
  179. socket->IsWriteOpen() ? "true":"false");
  180. assert(maxMSecsToWait >= 0);
  181. PerformDisconnection();
  182. if (maxMSecsToWait > 0)
  183. {
  184. PolledTimer timer((float)maxMSecsToWait);
  185. while(socket && socket->IsWriteOpen() && !timer.Test())
  186. {
  187. Clock::Sleep(1); ///\todo Instead of waiting multiple 1msec slices, should wait for proper event.
  188. }
  189. KNET_LOG(LogWaits, "MessageConnection::Disconnect: Waited %f msecs for disconnection. Result: %s.",
  190. timer.MSecsElapsed(), ConnectionStateToString(GetConnectionState()).c_str());
  191. }
  192. if (GetConnectionState() == ConnectionClosed)
  193. Close(0);
  194. }
  195. void MessageConnection::Close(int maxMSecsToWait) // [main thread]
  196. {
  197. AssertInMainThreadContext();
  198. if (maxMSecsToWait > 0 && socket && socket->IsWriteOpen())
  199. {
  200. Disconnect(maxMSecsToWait);
  201. KNET_LOG(LogInfo, "MessageConnection::Close(%d msecs): Disconnecting. connectionState = %s, readOpen:%s, writeOpen:%s.",
  202. maxMSecsToWait, ConnectionStateToString(connectionState).c_str(), (socket && socket->IsReadOpen()) ? "true":"false",
  203. (socket && socket->IsWriteOpen()) ? "true":"false");
  204. }
  205. if (owner)
  206. {
  207. KNET_LOG(LogInfo, "MessageConnection::Close: Closed connection to %s.", ToString().c_str());
  208. owner->CloseConnection(this); // This will cause this connection to be disconnected of its worker thread, so that we can safely proceed to tear down the socket.
  209. assert(!IsWorkerThreadRunning());
  210. owner = 0;
  211. ownerServer = 0;
  212. }
  213. if (socket)
  214. {
  215. socket->Close();
  216. assert(!IsWorkerThreadRunning());
  217. socket = 0; // Worker thread assumes access to the socket pointer, so can't have the thread running any more when we are doing this.
  218. }
  219. connectionState = ConnectionClosed;
  220. if (outboundAcceptQueue.Size() > 0)
  221. KNET_LOG(LogVerbose, "MessageConnection::Close(): Had %d messages in outboundAcceptQueue!", (int)outboundAcceptQueue.Size());
  222. if (outboundQueue.Size() > 0)
  223. KNET_LOG(LogVerbose, "MessageConnection::Close(): Had %d messages in outboundQueue!", (int)outboundQueue.Size());
  224. if (inboundMessageQueue.Size() > 0)
  225. KNET_LOG(LogVerbose, "MessageConnection::Close(): Had %d messages in inboundMessageQueue!", (int)inboundMessageQueue.Size());
  226. if (fragmentedSends.UnsafeGetValue().transfers.size() > 0)
  227. KNET_LOG(LogVerbose, "MessageConnection::Close(): Had %d messages in fragmentedSends.transfers list!", (int)fragmentedSends.UnsafeGetValue().transfers.size());
  228. if (!fragmentedReceives.transfers.empty())
  229. KNET_LOG(LogVerbose, "MessageConnection::Close(): Had %d messages in fragmentedReceives.transfers list!", (int)fragmentedReceives.transfers.size());
  230. FreeMessageData();
  231. }
  232. void MessageConnection::PauseOutboundSends()
  233. {
  234. AssertInMainThreadContext();
  235. eventMsgsOutAvailable.Reset();
  236. bOutboundSendsPaused = true;
  237. }
  238. void MessageConnection::ResumeOutboundSends()
  239. {
  240. AssertInMainThreadContext();
  241. bOutboundSendsPaused = false;
  242. if (NumOutboundMessagesPending() > 0)
  243. eventMsgsOutAvailable.Set();
  244. }
  245. void MessageConnection::SetPeerClosed()
  246. {
  247. AssertInWorkerThreadContext();
  248. switch(connectionState)
  249. {
  250. case ConnectionPending:
  251. KNET_LOG(LogVerbose, "Peer closed connection when in ConnectionPending state!");
  252. connectionState = ConnectionClosed; // Just tear it down, the peer rejected the connection.
  253. break;
  254. case ConnectionOK:
  255. connectionState = ConnectionPeerClosed;
  256. break;
  257. case ConnectionDisconnecting:
  258. connectionState = ConnectionClosed;
  259. break;
  260. case ConnectionPeerClosed:
  261. case ConnectionClosed:
  262. break; // We've already in the state where peer has closed the connection, no need to do anything.
  263. default:
  264. KNET_LOG(LogError, "SetPeerClosed() called at an unexpected time. The internal connectionState has an invalid value %d!", (int)connectionState);
  265. assert(false);
  266. break;
  267. }
  268. }
  269. void MessageConnection::FreeMessageData() // [main thread]
  270. {
  271. assert(!IsWorkerThreadRunning());
  272. Lockable<FragmentedSendManager>::LockType sends = fragmentedSends.Acquire();
  273. sends->FreeAllTransfers();
  274. fragmentedReceives.transfers.clear();
  275. while(outboundAcceptQueue.Size() > 0)
  276. {
  277. NetworkMessage *msg = outboundAcceptQueue.TakeFront();
  278. delete msg;
  279. }
  280. while(inboundMessageQueue.Size() > 0)
  281. {
  282. NetworkMessage *msg = inboundMessageQueue.TakeFront();
  283. delete msg;
  284. }
  285. #ifdef KNET_NO_MAXHEAP
  286. for(unsigned long i = 0; i < outboundQueue.Size(); ++i)
  287. delete *outboundQueue.ItemAt(i);
  288. #else
  289. for(int i = 0; i < outboundQueue.Size(); ++i)
  290. delete outboundQueue.data[i];
  291. #endif
  292. outboundQueue.Clear();
  293. inboundContentIDStamps.clear();
  294. outboundContentIDMessages.clear();
  295. Lockable<ConnectionStatistics>::LockType stats_ = statistics.Acquire();
  296. stats_->ping.clear();
  297. stats_->recvPacketIDs.clear();
  298. stats_->traffic.clear();
  299. networkSendSimulator.Free();
  300. }
  301. void MessageConnection::DetectConnectionTimeOut()
  302. {
  303. AssertInWorkerThreadContext();
  304. if (connectionState == ConnectionClosed)
  305. return;
  306. float lastHeardSince = LastHeardTime();
  307. if (lastHeardSince > connectionLostTimeout)
  308. {
  309. KNET_LOG(LogInfo, "It's been %.2fms since last heard from other end. connectionLostTimeout=%.2fms, so closing connection.",
  310. lastHeardSince, connectionLostTimeout);
  311. connectionState = ConnectionClosed;
  312. }
  313. }
  314. void MessageConnection::AcceptOutboundMessages() // [worker thread]
  315. {
  316. AssertInWorkerThreadContext();
  317. // If we are write-closed, discard all outbound messages from the client code, since we can't send them to the peer.
  318. if (connectionState == ConnectionDisconnecting || connectionState == ConnectionClosed)
  319. {
  320. while(outboundAcceptQueue.Size() > 0)
  321. {
  322. NetworkMessage *msg = *outboundAcceptQueue.Front();
  323. outboundAcceptQueue.PopFront();
  324. KNET_LOG(LogVerbose, "Warning: Discarding outbound network message with ID %d, since the connection is write-closed.",
  325. msg->id);
  326. // assert(!HaveOutboundMessageWithContentID(msg));
  327. FreeMessage(msg);
  328. }
  329. }
  330. // assert(ContainerUniqueAndNoNullElements(outboundAcceptQueue));
  331. // To throttle an over-eager main application, only accept this many messages from the main thread
  332. // at each execution frame.
  333. int numMessagesToAcceptPerFrame = 500;
  334. // Empty the queue from messages that the main thread has submitted for sending.
  335. while(outboundAcceptQueue.Size() > 0 && --numMessagesToAcceptPerFrame > 0)
  336. {
  337. assert(outboundAcceptQueue.Front() != 0);
  338. NetworkMessage *msg = *outboundAcceptQueue.Front();
  339. outboundAcceptQueue.PopFront();
  340. #ifdef KNET_NO_MAXHEAP
  341. outboundQueue.InsertWithResize(msg);
  342. #else
  343. outboundQueue.Insert(msg);
  344. #endif
  345. CheckAndSaveOutboundMessageWithContentID(msg);
  346. }
  347. // assert(ContainerUniqueAndNoNullElements(outboundQueue));
  348. // assert(ContainerUniqueAndNoNullElements(outboundAcceptQueue));
  349. }
  350. void MessageConnection::UpdateConnection() // [Called from the worker thread]
  351. {
  352. AssertInWorkerThreadContext();
  353. if (!socket)
  354. return;
  355. AcceptOutboundMessages();
  356. networkSendSimulator.Process();
  357. // MessageConnection needs to automatically manage the sending of ping messages in an unreliable channel.
  358. if (connectionState == ConnectionOK && pingTimer.TriggeredOrNotRunning())
  359. {
  360. if (!bOutboundSendsPaused)
  361. SendPingRequestMessage(true);
  362. DetectConnectionTimeOut();
  363. pingTimer.StartMSecs(pingIntervalMSecs);
  364. }
  365. // Produce statistics back to the application about the current connection state.
  366. if (statsRefreshTimer.TriggeredOrNotRunning())
  367. {
  368. ComputeStats();
  369. // Check if the socket is dead and mark it read-closed.
  370. if (connectionState == ConnectionOK || connectionState == ConnectionDisconnecting)
  371. if (!socket || !socket->IsReadOpen())
  372. {
  373. KNET_LOG(LogInfo, "Peer closed connection.");
  374. SetPeerClosed();
  375. }
  376. ADDEVENT("roundTripTime", RoundTripTime(), "msecs");
  377. ADDEVENT("lastHeardTime", LastHeardTime(), "msecs");
  378. ADDEVENT("packetsInPerSec", PacketsInPerSec(), "#");
  379. ADDEVENT("packetsOutPerSec", PacketsOutPerSec(), "#");
  380. ADDEVENT("msgsInPerSec", MsgsInPerSec(), "#");
  381. ADDEVENT("msgsOutPerSec", MsgsOutPerSec(), "#");
  382. ADDEVENT("bytesInPerSec", BytesInPerSec(), "bytes");
  383. ADDEVENT("bytesOutPerSec", BytesOutPerSec(), "bytes");
  384. ADDEVENT("bytesInTotal", (float)BytesInTotal(), "bytes");
  385. ADDEVENT("bytesOutTotal", (float)BytesOutTotal(), "bytes");
  386. statsRefreshTimer.StartMSecs(statsRefreshIntervalMSecs);
  387. }
  388. // Perform the TCP/UDP -specific connection update.
  389. DoUpdateConnection();
  390. }
  391. NetworkMessage *MessageConnection::AllocateNewMessage()
  392. {
  393. NetworkMessage *msg = messagePool.New();
  394. KNET_LOG(LogObjectAlloc, "MessageConnection::AllocateMessage %p!", msg);
  395. return msg;
  396. }
  397. void MessageConnection::FreeMessage(NetworkMessage *msg) // [main and worker thread]
  398. {
  399. if (!msg)
  400. return;
  401. if (msg->transfer)
  402. {
  403. msg->transfer->RemoveMessage(msg);
  404. msg->transfer = 0;
  405. }
  406. KNET_LOG(LogObjectAlloc, "MessageConnection::FreeMessage %p!", msg);
  407. messagePool.Free(msg);
  408. }
  409. NetworkMessage *MessageConnection::StartNewMessage(unsigned long id, size_t numBytes)
  410. {
  411. NetworkMessage *msg = AllocateNewMessage();
  412. if (!msg)
  413. {
  414. KNET_LOG(LogError, "MessageConnection::SendMessage: StartNewMessage failed! Discarding message send.");
  415. return 0; // Failed to allocate a new message. This is caused only by memory allocation issues.
  416. }
  417. msg->id = id;
  418. msg->reliable = false;
  419. msg->contentID = 0;
  420. msg->obsolete = false;
  421. // Give the new message the lowest priority by default.
  422. msg->priority = 0;
  423. // By default, the message is not fragmented. Later when admitting the message into the send queue, the need for
  424. // fragmentation is examined and this field will be updated if needed.
  425. msg->transfer = 0;
  426. #ifdef KNET_NETWORK_PROFILING
  427. msg->profilerName = "";
  428. #endif
  429. msg->Resize(numBytes);
  430. return msg;
  431. }
  432. void MessageConnection::SplitAndQueueMessage(NetworkMessage *message, bool internalQueue, size_t maxFragmentSize)
  433. {
  434. using namespace std;
  435. #ifdef KNET_THREAD_CHECKING_ENABLED
  436. if (internalQueue)
  437. AssertInWorkerThreadContext();
  438. else
  439. AssertInMainThreadContext();
  440. #endif
  441. assert(message);
  442. assert(!message->obsolete);
  443. // We need this many fragments to represent the whole message.
  444. const size_t totalNumFragments = (message->dataSize + maxFragmentSize - 1) / maxFragmentSize;
  445. assert(totalNumFragments > 1); // Shouldn't be calling this function if the message can well fit into one fragment.
  446. KNET_LOG(LogVerbose, "Splitting a message of %db into %d fragments of %db size at most.",
  447. (int)message->dataSize, (int)totalNumFragments, (int)maxFragmentSize);
  448. /** \todo Would like to do this:
  449. FragmentedSendManager::FragmentedTransfer *transfer;
  450. {
  451. Lock<FragmentedSendManager> sends = fragmentedSends.Acquire();
  452. transfer = sends->AllocateNewFragmentedTransfer();
  453. }
  454. */
  455. // But instead, have to resort to function-wide lock.
  456. Lock<FragmentedSendManager> sends = fragmentedSends.Acquire();
  457. FragmentedSendManager::FragmentedTransfer *transfer = sends->AllocateNewFragmentedTransfer();
  458. size_t currentFragmentIndex = 0;
  459. size_t byteOffset = 0;
  460. assert(transfer != 0);
  461. transfer->totalNumFragments = totalNumFragments;
  462. if (!message->reliable)
  463. {
  464. KNET_LOG(LogVerbose, "Upgraded a nonreliable message with ID %d and size %d to a reliable message since it had to be fragmented!", (int)message->id, (int)message->dataSize);
  465. }
  466. if (message->contentID != 0)
  467. {
  468. KNET_LOG(LogVerbose, "Warning: Content IDs are not supported with fragmented transfers. Removing the content ID %d of message %d of size %d.",
  469. (int)message->contentID, (int)message->id, (int)message->Size());
  470. message->contentID = 0;
  471. }
  472. // Split the message into fragments.
  473. while(byteOffset < message->dataSize)
  474. {
  475. const size_t thisFragmentSize = min(maxFragmentSize, message->dataSize - byteOffset);
  476. NetworkMessage *fragment = StartNewMessage(message->id, thisFragmentSize);
  477. fragment->contentID = message->contentID;
  478. fragment->inOrder = message->inOrder;
  479. fragment->reliable = true; // We don't send fragmented messages as unreliable messages - the risk of a fragment getting lost wastes bandwidth.
  480. fragment->messageNumber = outboundMessageNumberCounter++; ///\todo Convert to atomic increment, or this is a race condition.
  481. fragment->priority = message->priority;
  482. fragment->sendCount = 0;
  483. fragment->transfer = transfer;
  484. fragment->fragmentIndex = currentFragmentIndex++;
  485. fragment->reliableMessageNumber = outboundReliableMessageNumberCounter++; ///\todo Convert to atomic increment, or this is a race condition.
  486. #ifdef KNET_NETWORK_PROFILING
  487. fragment->profilerName = message->profilerName + "_Fragment";
  488. #endif
  489. // Copy the data from the old message that's supposed to go into this fragment.
  490. memcpy(fragment->data, message->data + byteOffset, thisFragmentSize);
  491. byteOffset += thisFragmentSize;
  492. transfer->AddMessage(fragment);
  493. if (internalQueue) // if true, we are accessing from the worker thread, and can directly access the outboundQueue member.
  494. {
  495. // assert(ContainerUniqueAndNoNullElements(outboundQueue));
  496. #ifdef KNET_NO_MAXHEAP
  497. outboundQueue.InsertWithResize(fragment);
  498. #else
  499. outboundQueue.Insert(fragment);
  500. #endif
  501. // assert(ContainerUniqueAndNoNullElements(outboundQueue));
  502. }
  503. else
  504. {
  505. if (!outboundAcceptQueue.Insert(fragment))
  506. {
  507. ///\todo Is it possible to check beforehand if this criteria is avoided, or if we are doomed?
  508. KNET_LOG(LogError, "Critical: Failed to add message fragment to outboundAcceptQueue! Queue was full. Do not know how to recover here!");
  509. assert(false);
  510. }
  511. }
  512. }
  513. // Signal the worker thread that there are new outbound events available.
  514. if (!bOutboundSendsPaused)
  515. eventMsgsOutAvailable.Set();
  516. // The original message that was split into fragments is no longer needed - it is represented by the newly created fragments
  517. // that have now been queued.
  518. FreeMessage(message);
  519. }
  520. void MessageConnection::EndAndQueueMessage(NetworkMessage *msg, size_t numBytes, bool internalQueue)
  521. {
  522. #ifdef KNET_THREAD_CHECKING_ENABLED
  523. if (internalQueue)
  524. AssertInWorkerThreadContext();
  525. else
  526. AssertInMainThreadContext();
  527. #endif
  528. assert(msg);
  529. if (!msg)
  530. return;
  531. // If the message was marked obsolete to start with, discard it.
  532. if (msg->obsolete || !socket || GetConnectionState() == ConnectionClosed || !socket->IsWriteOpen() ||
  533. (internalQueue == false && !IsWriteOpen()))
  534. {
  535. KNET_LOG(LogVerbose, "MessageConnection::EndAndQueueMessage: Discarded message with ID 0x%X and size %d bytes. "
  536. "msg->obsolete: %d. socket ptr: %p. ConnectionState: %s. socket->IsWriteOpen(): %s. msgconn->IsWriteOpen: %s. "
  537. "internalQueue: %s.",
  538. (int)msg->id, (int)numBytes, (int)msg->obsolete, socket, ConnectionStateToString(GetConnectionState()).c_str(), (socket && socket->IsWriteOpen()) ? "true" : "false",
  539. IsWriteOpen() ? "true" : "false", internalQueue ? "true" : "false");
  540. FreeMessage(msg);
  541. return;
  542. }
  543. // Remember the amount of bytes the client said to be using for later.
  544. if (numBytes != (size_t)(-1))
  545. msg->dataSize = numBytes;
  546. assert(msg->dataSize <= msg->Capacity());
  547. if (msg->dataSize > msg->Capacity())
  548. {
  549. KNET_LOG(LogError, "Critical! User specified a larger NetworkMessage than there is Capacity() for. Call NetworkMessage::Reserve() "
  550. "to ensure there is a proper amount of space for the buffer! Specified: %d bytes, Capacity(): %d bytes.",
  551. (int)msg->dataSize, (int)msg->Capacity());
  552. }
  553. // Check if the message is too big - in that case we split it into fixed size fragments and add them into the queue.
  554. ///\todo We can optimize here by doing the splitting at datagram creation time to create optimally sized datagrams, but
  555. /// it is quite more complicated, so left for later.
  556. const size_t sendHeaderUpperBound = 32; // Reserve some bytes for the packet and message headers. (an approximate upper bound)
  557. if (msg->dataSize + sendHeaderUpperBound > socket->MaxSendSize() && socket->TransportLayer() == SocketOverUDP)
  558. {
  559. const size_t maxFragmentSize = socket->MaxSendSize() / 4 - sendHeaderUpperBound; ///\todo Check this is ok.
  560. assert(maxFragmentSize > 0 && maxFragmentSize < socket->MaxSendSize());
  561. SplitAndQueueMessage(msg, internalQueue, maxFragmentSize);
  562. return;
  563. }
  564. msg->messageNumber = outboundMessageNumberCounter++; ///\todo Convert to atomic increment, or this is a race condition.
  565. msg->reliableMessageNumber = (msg->reliable ? outboundReliableMessageNumberCounter++ : 0); ///\todo Convert to atomic increment, or this is a race condition.
  566. msg->sendCount = 0;
  567. if (internalQueue) // if true, we are accessing from the worker thread, and can directly access the outboundQueue member.
  568. {
  569. KNET_LOG(LogVerbose, "MessageConnection::EndAndQueueMessage: Internal-queued message of size %d bytes and ID 0x%X.", (int)msg->Size(), (int)msg->id);
  570. // assert(ContainerUniqueAndNoNullElements(outboundQueue));
  571. #ifdef KNET_NO_MAXHEAP
  572. outboundQueue.InsertWithResize(msg);
  573. #else
  574. outboundQueue.Insert(msg);
  575. #endif
  576. // assert(ContainerUniqueAndNoNullElements(outboundQueue));
  577. }
  578. else
  579. {
  580. if (!outboundAcceptQueue.Insert(msg))
  581. {
  582. if (msg->reliable) // For nonreliable messages it is not critical if we can't enqueue the message. Just discard it.
  583. {
  584. ///\todo Is it possible to check beforehand if this criteria is avoided, or if we are doomed?
  585. KNET_LOG(LogVerbose, "Critical: Failed to add new reliable message to outboundAcceptQueue! Queue was full. Discarding the message!");
  586. assert(false);
  587. }
  588. FreeMessage(msg);
  589. return;
  590. }
  591. KNET_LOG(LogData, "MessageConnection::EndAndQueueMessage: Queued message of size %d bytes and ID 0x%X.", (int)msg->Size(), (int)msg->id);
  592. }
  593. // Signal the worker thread that there are new outbound events available.
  594. if (!bOutboundSendsPaused)
  595. eventMsgsOutAvailable.Set();
  596. }
  597. void MessageConnection::SendMessage(unsigned long id, bool reliable, bool inOrder, unsigned long priority,
  598. unsigned long contentID, const char *data, size_t numBytes)
  599. {
  600. AssertInMainThreadContext();
  601. NetworkMessage *msg = StartNewMessage(id, numBytes);
  602. if (!msg)
  603. {
  604. KNET_LOG(LogError, "MessageConnection::SendMessage: StartNewMessage failed! Discarding message send.");
  605. return;
  606. }
  607. msg->reliable = reliable;
  608. msg->inOrder = inOrder;
  609. msg->priority = priority;
  610. msg->contentID = contentID;
  611. assert(msg->data);
  612. assert(msg->Size() == numBytes);
  613. memcpy(msg->data, data, numBytes);
  614. EndAndQueueMessage(msg);
  615. }
  616. /// Called from the main thread to fetch & handle all new inbound messages.
  617. void MessageConnection::Process(int maxMessagesToProcess)
  618. {
  619. AssertInMainThreadContext();
  620. assert(maxMessagesToProcess >= 0);
  621. // Check the status of the connection worker thread.
  622. if (connectionState == ConnectionClosed || !socket || !socket->Connected())
  623. {
  624. if (socket)
  625. Close(); ///\todo This will block, since it is called with the default time period.
  626. connectionState = ConnectionClosed;
  627. return;
  628. }
  629. // The number of messages we are willing to process this cycle. If there are fewer messages than this
  630. // to process, we will return immediately (won't wait for this many messages to actually be received, it is just an upper limit).
  631. int numMessagesLeftToProcess = maxMessagesToProcess;
  632. while(inboundMessageQueue.Size() > 0 && (numMessagesLeftToProcess-- > 0 || maxMessagesToProcess == 0))
  633. {
  634. if (!inboundMessageHandler)
  635. {
  636. KNET_LOG(LogVerbose, "Warning! Cannot process messages since no message handler registered to connection %s!",
  637. ToString().c_str());
  638. return;
  639. }
  640. NetworkMessage **message = inboundMessageQueue.Front();
  641. assert(message);
  642. NetworkMessage *msg = *message;
  643. inboundMessageQueue.PopFront();
  644. assert(msg);
  645. inboundMessageHandler->HandleMessage(this, msg->receivedPacketID, msg->id, (msg->dataSize > 0) ? msg->data : 0, msg->dataSize);
  646. FreeMessage(msg);
  647. }
  648. }
  649. void MessageConnection::WaitForMessage(int maxMSecsToWait) // [main thread]
  650. {
  651. AssertInMainThreadContext();
  652. // If we have a message to process, no need to wait.
  653. if (inboundMessageQueue.Size() > 0)
  654. return;
  655. // Check the status of the connection worker thread.
  656. if (connectionState == ConnectionClosed)
  657. {
  658. if (socket)
  659. Close();
  660. return;
  661. }
  662. // Wait indefinitely until we get a new message, or the connection is torn down.
  663. if (maxMSecsToWait == 0)
  664. {
  665. ///\todo Log out warning if this takes AGES. Or rather, perhaps remove support for this altogether
  666. /// to avoid deadlocks.
  667. while(inboundMessageQueue.Size() == 0 && GetConnectionState() == ConnectionOK)
  668. Clock::Sleep(1); ///\todo Instead of waiting multiple 1msec slices, should wait for proper event.
  669. }
  670. else
  671. {
  672. PolledTimer timer;
  673. timer.StartMSecs((float)maxMSecsToWait);
  674. while(inboundMessageQueue.Size() == 0 && GetConnectionState() == ConnectionOK && !timer.Test())
  675. Clock::Sleep(1); ///\todo Instead of waiting multiple 1msec slices, should wait for proper event.
  676. if (timer.MSecsElapsed() >= 1000.f)
  677. {
  678. KNET_LOG(LogWaits, "MessageConnection::WaitForMessage: Waited %f msecs for a new message. ConnectionState: %s. %d messages in queue.",
  679. timer.MSecsElapsed(), ConnectionStateToString(GetConnectionState()).c_str(), (int)inboundMessageQueue.Size());
  680. }
  681. }
  682. }
  683. NetworkMessage *MessageConnection::ReceiveMessage(int maxMSecsToWait) // [main thread]
  684. {
  685. AssertInMainThreadContext();
  686. // Check the status of the connection worker thread.
  687. if (connectionState == ConnectionClosed)
  688. {
  689. if (socket)
  690. Close();
  691. return 0;
  692. }
  693. // If we don't have a message, wait for the given duration to receive one.
  694. if (inboundMessageQueue.Size() == 0 && maxMSecsToWait >= 0)
  695. WaitForMessage(maxMSecsToWait);
  696. // Did we get a message even after the max timeout?
  697. if (inboundMessageQueue.Size() == 0)
  698. return 0;
  699. NetworkMessage *message = *inboundMessageQueue.Front();
  700. inboundMessageQueue.PopFront();
  701. assert(message);
  702. return message;
  703. }
  704. bool EraseReliableIfObsoleteOrNotInOrderCmp(const NetworkMessage *msg)
  705. {
  706. assert(msg->reliable);
  707. return msg->inOrder == false || msg->obsolete;
  708. }
  709. bool EraseReliableIfObsoleteCmp(const NetworkMessage *msg)
  710. {
  711. assert(msg->reliable);
  712. return msg->obsolete;
  713. }
  714. int NetworkMessage::GetTotalDatagramPackedSize() const
  715. {
  716. // const int idLength = (transfer == 0 || fragmentIndex == 0) ? VLE8_16_32::GetEncodedBitLength(id)/8 : 0;
  717. // const int headerLength = 2;
  718. const int headerLength = 30; ///\todo This is loose, but since it only needs to be an upper bound, it is safe now.
  719. const int contentLength = dataSize;
  720. return headerLength + contentLength;
  721. // const int fragmentStartLength = (transfer && fragmentIndex == 0) ? VLE8_16_32::GetEncodedBitLength(transfer->totalNumFragments)/8 : 0;
  722. // const int fragmentLength = (transfer ? 1 : 0) + ((transfer && fragmentIndex != 0) ? VLE8_16_32::GetEncodedBitLength(fragmentIndex)/8 : 0);
  723. ///\todo Take into account the inOrder field.
  724. // return idLength + headerLength + contentLength + fragmentStartLength + fragmentLength;
  725. }
  726. void MessageConnection::AddOutboundStats(unsigned long numBytes, unsigned long numPackets, unsigned long numMessages)
  727. {
  728. AssertInWorkerThreadContext();
  729. if (numBytes == 0 && numMessages == 0 && numPackets == 0)
  730. return;
  731. ConnectionStatistics &cs = statistics.LockGet();
  732. cs.traffic.push_back(ConnectionStatistics::TrafficTrack());
  733. ConnectionStatistics::TrafficTrack &t = cs.traffic.back();
  734. t.bytesIn = t.messagesIn = t.packetsIn = 0;
  735. t.bytesOut = numBytes;
  736. t.packetsOut = numPackets;
  737. t.messagesOut = numMessages;
  738. t.tick = Clock::Tick();
  739. statistics.Unlock();
  740. bytesOutTotal += numBytes;
  741. }
  742. void MessageConnection::AddInboundStats(unsigned long numBytes, unsigned long numPackets, unsigned long numMessages)
  743. {
  744. AssertInWorkerThreadContext();
  745. if (numBytes == 0 && numMessages == 0 && numPackets == 0)
  746. return;
  747. ConnectionStatistics &cs = statistics.LockGet();
  748. cs.traffic.push_back(ConnectionStatistics::TrafficTrack());
  749. ConnectionStatistics::TrafficTrack &t = cs.traffic.back();
  750. t.bytesOut = t.messagesOut = t.packetsOut = 0;
  751. t.bytesIn = numBytes;
  752. t.packetsIn = numPackets;
  753. t.messagesIn = numMessages;
  754. t.tick = Clock::Tick();
  755. statistics.Unlock();
  756. bytesInTotal += numBytes;
  757. }
  758. void MessageConnection::ComputeStats()
  759. {
  760. AssertInWorkerThreadContext();
  761. ConnectionStatistics &cs = statistics.LockGet();
  762. const tick_t maxEntryAge = Clock::TicksPerSec() * 5;
  763. const tick_t timeNow = Clock::Tick();
  764. const tick_t maxTickAge = timeNow - maxEntryAge;
  765. for(size_t i = 0; i < cs.traffic.size(); ++i)
  766. if (Clock::IsNewer(cs.traffic[i].tick, maxTickAge))
  767. {
  768. cs.traffic.erase(cs.traffic.begin(), cs.traffic.begin() + i);
  769. break;
  770. }
  771. if (cs.traffic.size() <= 1)
  772. {
  773. bytesInPerSec = bytesOutPerSec = msgsInPerSec = msgsOutPerSec = packetsInPerSec = packetsOutPerSec = 0.f;
  774. statistics.Unlock();
  775. return;
  776. }
  777. unsigned long totalBytesIn = 0;
  778. unsigned long totalBytesOut = 0;
  779. unsigned long totalMsgsIn = 0;
  780. unsigned long totalMsgsOut = 0;
  781. unsigned long totalPacketsIn = 0;
  782. unsigned long totalPacketsOut = 0;
  783. for(size_t i = 0; i < cs.traffic.size(); ++i)
  784. {
  785. totalBytesIn += cs.traffic[i].bytesIn;
  786. totalBytesOut += cs.traffic[i].bytesOut;
  787. totalPacketsIn += cs.traffic[i].packetsIn;
  788. totalPacketsOut += cs.traffic[i].packetsOut;
  789. totalMsgsIn += cs.traffic[i].messagesIn;
  790. totalMsgsOut += cs.traffic[i].messagesOut;
  791. }
  792. tick_t ticks = cs.traffic.back().tick - cs.traffic.front().tick;
  793. float secs = max(1.f, (float)Clock::TicksToMillisecondsD(ticks) / 1000.f);
  794. bytesInPerSec = (float)totalBytesIn / secs;
  795. bytesOutPerSec = (float)totalBytesOut / secs;
  796. packetsInPerSec = (float)totalPacketsIn / secs;
  797. packetsOutPerSec = (float)totalPacketsOut / secs;
  798. msgsInPerSec = (float)totalMsgsIn / secs;
  799. msgsOutPerSec = (float)totalMsgsOut / secs;
  800. statistics.Unlock();
  801. }
  802. void MessageConnection::CheckAndSaveOutboundMessageWithContentID(NetworkMessage *msg)
  803. {
  804. AssertInWorkerThreadContext();
  805. assert(msg);
  806. if (msg->contentID == 0)
  807. return;
  808. MsgContentIDPair key = std::make_pair(msg->id, msg->contentID);
  809. ContentIDSendTrack::iterator iter = outboundContentIDMessages.find(key);
  810. if (iter != outboundContentIDMessages.end()) // We have a previous message in the queue which is now obsoleted by this message.
  811. {
  812. // Sanity check: The message numbers must be in the proper order. msg must have been admitted later to send queue than the existing message.
  813. if (msg->IsNewerThan(*iter->second))
  814. {
  815. iter->second->obsolete = true;
  816. assert(iter->second != msg);
  817. assert(iter->first.first == iter->second->id);
  818. assert(iter->first.second == iter->second->contentID);
  819. assert(iter->first.first == msg->id);
  820. assert(iter->first.second == msg->contentID);
  821. iter->second = msg;
  822. }
  823. else // This shouldn't happen, but gracefully handle that situation if it does!
  824. {
  825. KNET_LOG(LogError, "Warning! Adding new message ID %d, number %d, content ID %d, priority %d, but it was obsoleted by an already existing message number %d.",
  826. (int)msg->id, (int)msg->messageNumber, (int)msg->contentID, (int)iter->second->priority, (int)iter->second->messageNumber);
  827. msg->obsolete = true;
  828. }
  829. }
  830. else
  831. {
  832. outboundContentIDMessages[key] = msg;
  833. }
  834. }
  835. void MessageConnection::ClearOutboundMessageWithContentID(NetworkMessage *msg)
  836. {
  837. AssertInWorkerThreadContext();
  838. if (!msg)
  839. return;
  840. assert(msg);
  841. if (msg->contentID == 0)
  842. return;
  843. MsgContentIDPair key = std::make_pair(msg->id, msg->contentID);
  844. ContentIDSendTrack::iterator iter = outboundContentIDMessages.find(key);
  845. if (iter != outboundContentIDMessages.end())
  846. if (msg == iter->second)
  847. outboundContentIDMessages.erase(iter);
  848. }
  849. bool MessageConnection::CheckAndSaveContentIDStamp(message_id_t messageID, u32 contentID, packet_id_t packetID)
  850. {
  851. AssertInWorkerThreadContext();
  852. assert(contentID != 0);
  853. tick_t now = Clock::Tick();
  854. MsgContentIDPair key = std::make_pair(messageID, contentID);
  855. ContentIDReceiveTrack::iterator iter = inboundContentIDStamps.find(key);
  856. if (iter == inboundContentIDStamps.end())
  857. {
  858. inboundContentIDStamps[key] = std::make_pair(packetID, now);
  859. return true;
  860. }
  861. else
  862. {
  863. if (PacketIDIsNewerThan(packetID, iter->second.first) || (float)Clock::TimespanToMillisecondsD(iter->second.second, now) > 5.f * 1000.f)
  864. {
  865. iter->second = std::make_pair(packetID, now);
  866. return true;
  867. }
  868. else
  869. return false;
  870. }
  871. }
  872. void MessageConnection::HandleInboundMessage(packet_id_t packetID, const char *data, size_t numBytes)
  873. {
  874. AssertInWorkerThreadContext();
  875. if (!socket)
  876. return; // Ignore all messages from connections that have already died.
  877. assert(data && numBytes > 0);
  878. // Read the message ID.
  879. DataDeserializer reader(data, numBytes);
  880. message_id_t messageID = reader.ReadVLE<VLE8_16_32>(); ///\todo Check that there actually is enough space to read.
  881. if (messageID == DataDeserializer::VLEReadError)
  882. {
  883. KNET_LOG(LogError, "Error parsing messageID of a message in socket %s. Data size: %d bytes.", socket->ToString().c_str(), (int)numBytes);
  884. throw NetException("MessageConnection::HandleInboundMessage: Network error occurred when deserializing message ID VLE field!");
  885. }
  886. KNET_LOG(LogData, "Received message with ID %d and size %d from peer %s.", (int)packetID, (int)numBytes, socket->ToString().c_str());
  887. char str[256];
  888. sprintf(str, "messageIn.%u", (unsigned int)messageID);
  889. ADDEVENT(str, (float)reader.BytesLeft(), "bytes");
  890. // Pass the message to TCP/UDP -specific message handler.
  891. bool childHandledMessage = HandleMessage(packetID, messageID, data + reader.BytePos(), reader.BytesLeft());
  892. if (childHandledMessage)
  893. return; // If the derived class handled the message, no need to propagate it further.
  894. switch(messageID)
  895. {
  896. case MsgIdPingRequest:
  897. HandlePingRequestMessage(data + reader.BytePos(), reader.BytesLeft());
  898. break;
  899. case MsgIdPingReply:
  900. HandlePingReplyMessage(data + reader.BytePos(), reader.BytesLeft());
  901. break;
  902. default:
  903. {
  904. NetworkMessage *msg = AllocateNewMessage();
  905. msg->Resize(numBytes);
  906. assert(reader.BitPos() == 0);
  907. memcpy(msg->data, data + reader.BytePos(), reader.BytesLeft());
  908. msg->dataSize = reader.BytesLeft();
  909. msg->id = messageID;
  910. msg->contentID = 0;
  911. msg->receivedPacketID = packetID;
  912. bool success = inboundMessageQueue.Insert(msg);
  913. if (!success)
  914. {
  915. KNET_LOG(LogError, "Failed to add a new message of ID %d and size %dB to inbound queue! Queue was full.",
  916. (int)messageID, (int)msg->dataSize);
  917. FreeMessage(msg);
  918. }
  919. }
  920. break;
  921. }
  922. }
  923. void MessageConnection::SetMaximumDataSendRate(int /*numBytesPerSec*/, int /*numDatagramsPerSec*/)
  924. {
  925. }
  926. void MessageConnection::RegisterInboundMessageHandler(IMessageHandler *handler)
  927. {
  928. AssertInMainThreadContext();
  929. inboundMessageHandler = handler;
  930. }
  931. void MessageConnection::SendPingRequestMessage(bool internalQueue)
  932. {
  933. #ifdef KNET_THREAD_CHECKING_ENABLED
  934. if (internalQueue)
  935. AssertInWorkerThreadContext();
  936. else
  937. AssertInMainThreadContext();
  938. #endif
  939. ConnectionStatistics &cs = statistics.LockGet();
  940. u8 pingID = (u8)((cs.ping.empty()) ? 1 : (cs.ping.back().pingID + 1));
  941. cs.ping.push_back(ConnectionStatistics::PingTrack());
  942. ConnectionStatistics::PingTrack &pingTrack = cs.ping.back();
  943. pingTrack.replyReceived = false;
  944. pingTrack.pingSentTick = Clock::Tick();
  945. pingTrack.pingID = pingID;
  946. statistics.Unlock();
  947. NetworkMessage *msg = StartNewMessage(MsgIdPingRequest, 1);
  948. msg->data[0] = pingID;
  949. msg->priority = NetworkMessage::cMaxPriority - 2;
  950. #ifdef KNET_NETWORK_PROFILING
  951. msg->profilerName = "PingRequest (1)";
  952. #endif
  953. EndAndQueueMessage(msg, 1, internalQueue);
  954. KNET_LOG(LogVerbose, "Enqueued ping message %d.", (int)pingID);
  955. }
  956. void MessageConnection::HandlePingRequestMessage(const char *data, size_t numBytes)
  957. {
  958. AssertInWorkerThreadContext();
  959. if (numBytes != 1)
  960. {
  961. KNET_LOG(LogError, "Malformed PingRequest message received! Size was %d bytes, expected 1 byte!", (int)numBytes);
  962. return;
  963. }
  964. u8 pingID = (u8)*data;
  965. NetworkMessage *msg = StartNewMessage(MsgIdPingReply, 1);
  966. msg->data[0] = pingID;
  967. msg->priority = NetworkMessage::cMaxPriority - 1;
  968. #ifdef KNET_NETWORK_PROFILING
  969. msg->profilerName = "PingReply (2)";
  970. #endif
  971. EndAndQueueMessage(msg, 1, true);
  972. KNET_LOG(LogVerbose, "HandlePingRequestMessage: %d.", (int)pingID);
  973. }
  974. void MessageConnection::HandlePingReplyMessage(const char *data, size_t numBytes)
  975. {
  976. AssertInWorkerThreadContext();
  977. if (numBytes != 1)
  978. {
  979. KNET_LOG(LogError, "Malformed PingReply message received! Size was %d bytes, expected 1 byte!", (int)numBytes);
  980. return;
  981. }
  982. ConnectionStatistics &cs = statistics.LockGet();
  983. // How much to bias the new rtt value against the old rtt estimation. 1.f - 100% biased to the new value. near zero - very stable and nonfluctuant.
  984. const float rttPredictBias = 0.5f;
  985. u8 pingID = *(u8*)data;
  986. for(size_t i = 0; i < cs.ping.size(); ++i)
  987. if (cs.ping[i].pingID == pingID && cs.ping[i].replyReceived == false)
  988. {
  989. cs.ping[i].pingReplyTick = Clock::Tick();
  990. float newRtt = (float)Clock::TicksToMillisecondsD(Clock::TicksInBetween(cs.ping[i].pingReplyTick, cs.ping[i].pingSentTick));
  991. cs.ping[i].replyReceived = true;
  992. statistics.Unlock();
  993. rtt = rttPredictBias * newRtt + (1.f * rttPredictBias) * rtt;
  994. KNET_LOG(LogVerbose, "HandlePingReplyMessage: %d.", (int)pingID);
  995. return;
  996. }
  997. statistics.Unlock();
  998. KNET_LOG(LogError, "Received PingReply with ID %d in socket %s, but no matching PingRequest was ever sent!", (int)pingID, socket->ToString().c_str());
  999. }
  1000. std::string MessageConnection::ToString() const
  1001. {
  1002. if (socket)
  1003. return socket->ToString();
  1004. else
  1005. return "(Not connected)";
  1006. }
  1007. void MessageConnection::DumpStatus() const
  1008. {
  1009. AssertInMainThreadContext();
  1010. char str[4096];
  1011. sprintf(str, "Connection Status: %s.\n"
  1012. "\tInboundMessagesPending: %d.\n"
  1013. "\tOutboundMessagesPending: %d.\n"
  1014. "\tMessageConnection: %s %s %s.\n"
  1015. "\tSocket: %s %s %s %s.\n"
  1016. "\tRound-Trip Time: %.2fms.\n"
  1017. "\tLastHeardTime: %.2fms.\n"
  1018. "\tDatagrams in: %.2f/sec.\n"
  1019. "\tDatagrams out: %.2f/sec.\n"
  1020. "\tMessages in: %.2f/sec.\n"
  1021. "\tMessages out: %.2f/sec.\n"
  1022. "\tBytes in: %s/sec.\n"
  1023. "\tBytes out: %s/sec.\n"
  1024. "\tEventMsgsOutAvailable: %d.\n"
  1025. "\tOverlapped in: %d (event: %s)\n"
  1026. "\tOverlapped out: %d (event: %s)\n"
  1027. "\tTime until next send: %d\n"
  1028. "\toutboundQueue.Size(): %d\n",
  1029. ConnectionStateToString(GetConnectionState()).c_str(),
  1030. (int)NumInboundMessagesPending(),
  1031. (int)NumOutboundMessagesPending(),
  1032. Connected() ? "connected" : "",
  1033. IsReadOpen() ? "readOpen" : "",
  1034. IsWriteOpen() ? "writeOpen" : "",
  1035. socket ? "exists" : "zero",
  1036. (socket && socket->Connected()) ? "connected" : "",
  1037. (socket && socket->IsReadOpen()) ? "readOpen" : "",
  1038. (socket && socket->IsWriteOpen()) ? "writeOpen" : "",
  1039. RoundTripTime(), LastHeardTime(), PacketsInPerSec(), PacketsOutPerSec(),
  1040. MsgsInPerSec(), MsgsOutPerSec(),
  1041. FormatBytes(BytesInPerSec()).c_str(), FormatBytes(BytesOutPerSec()).c_str(),
  1042. (int)eventMsgsOutAvailable.Test(),
  1043. #ifdef _WIN32
  1044. socket ? socket->NumOverlappedReceivesInProgress() : -1,
  1045. #else
  1046. -1,
  1047. #endif
  1048. (socket && socket->GetOverlappedReceiveEvent().Test()) ? "true" : "false",
  1049. #ifdef _WIN32
  1050. socket ? socket->NumOverlappedSendsInProgress() : -1,
  1051. #else
  1052. -1,
  1053. #endif
  1054. (socket && socket->GetOverlappedSendEvent().Test()) ? "true" : "false",
  1055. (int)TimeUntilCanSendPacket(),
  1056. (int)outboundQueue.Size());
  1057. KNET_LOGUSER(str);
  1058. DumpConnectionStatus();
  1059. }
  1060. Event MessageConnection::NewOutboundMessagesEvent() const
  1061. {
  1062. assert(!eventMsgsOutAvailable.IsNull());
  1063. return eventMsgsOutAvailable;
  1064. }
  1065. MessageConnection::SocketReadResult MessageConnection::ReadSocket()
  1066. {
  1067. AssertInWorkerThreadContext();
  1068. size_t ignored = 0;
  1069. return ReadSocket(ignored);
  1070. }
  1071. // This function returns true if the current thread of execution is in the worker thread, or, if there is no
  1072. // worker thread running at all.
  1073. void MessageConnection::AssertInWorkerThreadContext() const
  1074. {
  1075. #ifdef KNET_THREAD_CHECKING_ENABLED
  1076. const bool haveWorkerThread = (workerThread != 0);
  1077. kNet::ThreadId currentThreadId = Thread::CurrentThreadId();
  1078. if (haveWorkerThread && currentThreadId != workerThreadId)
  1079. {
  1080. KNET_LOG(LogError, "Assert failure in MessageConnection::AssertInWorkerThreadContext()!: haveWorkerThread: %s, currentThreadId: %s, workerThreadId: %s,",
  1081. haveWorkerThread ? "true" : "false", ThreadIdToString(currentThreadId).c_str(), ThreadIdToString(workerThreadId).c_str());
  1082. assert(false && "MessageConnection::AssertInWorkerThreadContext assert failure!");
  1083. }
  1084. #endif
  1085. }
  1086. // This function returns true if the current thread of execution is in the main thread.
  1087. void MessageConnection::AssertInMainThreadContext() const
  1088. {
  1089. #ifdef KNET_THREAD_CHECKING_ENABLED
  1090. const bool haveWorkerThread = (workerThread != 0);
  1091. kNet::ThreadId currentThreadId = Thread::CurrentThreadId();
  1092. if (haveWorkerThread && currentThreadId == workerThreadId)
  1093. {
  1094. KNET_LOG(LogError, "Assert failure in MessageConnection::AssertInMainThreadContext()!: haveWorkerThread: %s, currentThreadId: %s, workerThreadId: %s,",
  1095. haveWorkerThread ? "true" : "false", ThreadIdToString(currentThreadId).c_str(), ThreadIdToString(workerThreadId).c_str());
  1096. assert(false && "MessageConnection::AssertInMainThreadContext assert failure!");
  1097. }
  1098. #endif
  1099. }
  1100. void MessageConnection::SetWorkerThread(NetworkWorkerThread *thread)
  1101. {
  1102. workerThread = thread;
  1103. #ifdef KNET_THREAD_CHECKING_ENABLED
  1104. workerThreadId = thread ? thread->ThreadObject().Id() : Thread::NullThreadId();
  1105. #endif
  1106. AssertInMainThreadContext();
  1107. }
  1108. EndPoint MessageConnection::LocalEndPoint() const
  1109. {
  1110. if (socket)
  1111. return socket->LocalEndPoint();
  1112. else
  1113. return EndPoint();
  1114. }
  1115. EndPoint MessageConnection::RemoteEndPoint() const
  1116. {
  1117. if (socket)
  1118. return socket->RemoteEndPoint();
  1119. else
  1120. return EndPoint();
  1121. }
  1122. } // ~kNet