TCPMessageConnection.cpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397
  1. /* Copyright The kNet Project.
  2. Licensed under the Apache License, Version 2.0 (the "License");
  3. you may not use this file except in compliance with the License.
  4. You may obtain a copy of the License at
  5. http://www.apache.org/licenses/LICENSE-2.0
  6. Unless required by applicable law or agreed to in writing, software
  7. distributed under the License is distributed on an "AS IS" BASIS,
  8. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  9. See the License for the specific language governing permissions and
  10. limitations under the License. */
  11. /** @file TCPMessageConnection.cpp
  12. @brief */
  13. #include <sstream>
  14. #ifdef KNET_USE_BOOST
  15. #include <boost/thread/thread.hpp>
  16. #endif
  17. #include "kNet/Allocator.h"
  18. #include "kNet/DebugMemoryLeakCheck.h"
  19. #include "kNet/TCPMessageConnection.h"
  20. #include "kNet/NetworkLogging.h"
  21. #include "kNet/DataSerializer.h"
  22. #include "kNet/DataDeserializer.h"
  23. #include "kNet/VLEPacker.h"
  24. #include "kNet/NetException.h"
  25. #include "kNet/Network.h"
  26. namespace kNet
  27. {
  28. /// The maximum size for a TCP message we will allow to be received. If we receive a message larger than this, we consider
  29. /// it as a protocol violation and kill the connection.
  30. static const u32 cMaxReceivableTCPMessageSize = 10 * 1024 * 1024; ///\todo Make this configurable for the connection.
  31. TCPMessageConnection::TCPMessageConnection(Network *owner, NetworkServer *ownerServer, Socket *socket, ConnectionState startingState)
  32. :MessageConnection(owner, ownerServer, socket, startingState),
  33. tcpInboundSocketData(64 * 1024)
  34. {
  35. }
  36. TCPMessageConnection::~TCPMessageConnection()
  37. {
  38. if (owner)
  39. owner->CloseConnection(this);
  40. }
  41. MessageConnection::SocketReadResult TCPMessageConnection::ReadSocket(size_t &totalBytesRead)
  42. {
  43. AssertInWorkerThreadContext();
  44. totalBytesRead = 0;
  45. if (!socket || !socket->IsReadOpen())
  46. return SocketReadError;
  47. using namespace std;
  48. // This is a limit on how many messages we keep in the inbound application buffer at maximum.
  49. // If we receive data from the TCP socket faster than this limit, we stop reading until
  50. // the application handles the previous messages first.
  51. const int arbitraryInboundMessageCapacityLimit = 2048;
  52. if (inboundMessageQueue.CapacityLeft() < arbitraryInboundMessageCapacityLimit)
  53. {
  54. LOG(LogVerbose, "TCPMessageConnection::ReadSocket: Read throttled! Application cannot consume data fast enough.");
  55. return SocketReadThrottled; // Can't read in new data, since the client app can't process it so fast.
  56. }
  57. // This is an arbitrary throttle limit on how much data we read in this function at once. Without this limit,
  58. // a slow computer with a fast network connection and a fast sender at the other end could flood this end
  59. // with so many messages that we wouldn't ever return from the loop below until the sender stops. This would
  60. // starve the processing of all other connections this worker thread has to manage.
  61. const size_t maxBytesToRead = 1024 * 1024;
  62. // Pump the socket's receiving end until it's empty or can't process any more for now.
  63. while(totalBytesRead < maxBytesToRead)
  64. {
  65. assert(socket);
  66. // If we don't have enough free space in the ring buffer (even after compacting), throttle the reading of data.
  67. if (tcpInboundSocketData.ContiguousFreeBytesLeft() < 16384 && tcpInboundSocketData.Capacity() > 1048576)
  68. {
  69. tcpInboundSocketData.Compact();
  70. if (tcpInboundSocketData.ContiguousFreeBytesLeft() < 16384)
  71. return SocketReadThrottled;
  72. }
  73. OverlappedTransferBuffer *buffer = socket->BeginReceive();
  74. if (!buffer)
  75. break; // Nothing to receive.
  76. // If we can't fit the data we got, compact the ring buffer.
  77. if (buffer->bytesContains > tcpInboundSocketData.ContiguousFreeBytesLeft())
  78. {
  79. tcpInboundSocketData.Compact();
  80. if (buffer->bytesContains > tcpInboundSocketData.ContiguousFreeBytesLeft())
  81. {
  82. // Even compacting didn't get enough space to fit the message, so resize the ring buffer to be able to contain the message.
  83. // At least always double the capacity of the buffer, so that we don't waste effort incrementing the capacity by too small amounts at a time.
  84. tcpInboundSocketData.Resize(max(tcpInboundSocketData.Capacity()*2, tcpInboundSocketData.Capacity() + buffer->bytesContains - tcpInboundSocketData.ContiguousFreeBytesLeft()));
  85. LOG(LogWaits, "TCPMessageConnection::ReadSocket: Performance warning! Resized the capacity of the receive ring buffer to %d bytes to accommodate a message of size %d (now have %d bytes of free space)",
  86. tcpInboundSocketData.Capacity(), buffer->bytesContains, tcpInboundSocketData.ContiguousFreeBytesLeft());
  87. }
  88. }
  89. LOG(LogData, "TCPMessageConnection::ReadSocket: Received %d bytes from the network from peer %s.",
  90. buffer->bytesContains, socket->ToString().c_str());
  91. assert((size_t)buffer->bytesContains <= (size_t)tcpInboundSocketData.ContiguousFreeBytesLeft());
  92. ///\todo For performance, this memcpy can be optimized away. We can parse the message directly
  93. /// from this buffer without copying it to a temporary working buffer. Detect if message straddles
  94. /// two OverlappedTransferBuffers and only in that case memcpy that message to form a
  95. /// single contiguous memory area.
  96. memcpy(tcpInboundSocketData.End(), buffer->buffer.buf, buffer->bytesContains);
  97. tcpInboundSocketData.Inserted(buffer->bytesContains); // Mark the memory area in the ring buffer as used.
  98. totalBytesRead += buffer->bytesContains;
  99. socket->EndReceive(buffer);
  100. }
  101. // Update statistics about the connection.
  102. if (totalBytesRead > 0)
  103. {
  104. lastHeardTime = Clock::Tick();
  105. ADDEVENT("tcpDataIn", (float)totalBytesRead, "bytes");
  106. AddInboundStats(totalBytesRead, 1, 0);
  107. }
  108. // Finally, try to parse any bytes we received to complete messages. Any bytes consisting a partial
  109. // message will be left into the tcpInboundSocketData partial buffer to wait for more bytes to be received later.
  110. ExtractMessages();
  111. if (totalBytesRead >= maxBytesToRead)
  112. return SocketReadThrottled;
  113. else
  114. return SocketReadOK;
  115. }
  116. /// Checks that the specified conditions for the container apply.
  117. /// Warning: This is a non-threadsafe check for the container, only to be used for debugging.
  118. /// Warning #2: This function is very slow, as it performs a N^2 search through the container.
  119. template<typename T>
  120. bool ContainerUniqueAndNoNullElements(const std::vector<T> &cont)
  121. {
  122. for(size_t i = 0; i < cont.size(); ++i)
  123. for(size_t j = i+1; j < cont.size(); ++j)
  124. if (cont[i] == cont[j] || cont[i] == 0)
  125. return false;
  126. return true;
  127. }
  128. /// Packs several messages from the outbound priority queue into a single packet and sends it out the wire.
  129. /// @return False if the send was a failure and sending should not be tried again at this time, true otherwise.
  130. MessageConnection::PacketSendResult TCPMessageConnection::SendOutPacket()
  131. {
  132. AssertInWorkerThreadContext();
  133. if (bOutboundSendsPaused || outboundQueue.Size() == 0)
  134. return PacketSendNoMessages;
  135. if (!socket || !socket->IsWriteOpen())
  136. {
  137. LOG(LogVerbose, "TCPMessageConnection::SendOutPacket: Socket is not write open %p!", socket);
  138. if (connectionState == ConnectionOK) ///\todo This is slightly annoying to manually update the state here,
  139. connectionState = ConnectionPeerClosed; /// reorganize to be able to have this automatically apply.
  140. if (connectionState == ConnectionDisconnecting)
  141. connectionState = ConnectionClosed;
  142. return PacketSendSocketClosed;
  143. }
  144. // 'serializedMessages' is a temporary data structure used only by this member function.
  145. // It caches a list of all the messages we are pushing out during this call.
  146. serializedMessages.clear();
  147. // In the following, we start coalescing multiple messages into a single socket send() calls.
  148. // Get the maximum number of bytes we can coalesce for the send() call. This is only a soft limit
  149. // in the sense that if we encounter a single message that is larger than this limit, then we try
  150. // to send that through in one send() call.
  151. // const size_t maxSendSize = socket->MaxSendSize();
  152. // Push out all the pending data to the socket.
  153. OverlappedTransferBuffer *overlappedTransfer = 0;
  154. int numMessagesPacked = 0;
  155. DataSerializer writer;
  156. // assert(ContainerUniqueAndNoNullElements(outboundQueue)); // This precondition should always hold (but very heavy to test, uncomment to debug)
  157. while(outboundQueue.Size() > 0)
  158. {
  159. #ifdef KNET_NO_MAXHEAP
  160. NetworkMessage *msg = *outboundQueue.Front();
  161. #else
  162. NetworkMessage *msg = outboundQueue.Front();
  163. #endif
  164. if (msg->obsolete)
  165. {
  166. ClearOutboundMessageWithContentID(msg);
  167. FreeMessage(msg);
  168. outboundQueue.PopFront();
  169. continue;
  170. }
  171. const int encodedMsgIdLength = VLE8_16_32::GetEncodedBitLength(msg->id) / 8;
  172. const size_t messageContentSize = msg->dataSize + encodedMsgIdLength; // 1 byte: Message ID. X bytes: Content.
  173. const int encodedMsgSizeLength = VLE8_16_32::GetEncodedBitLength(messageContentSize) / 8;
  174. const size_t totalMessageSize = messageContentSize + encodedMsgSizeLength; // 2 bytes: Content length. X bytes: Content.
  175. if (!overlappedTransfer)
  176. {
  177. overlappedTransfer = socket->BeginSend(std::max<size_t>(socket->MaxSendSize(), totalMessageSize));
  178. if (!overlappedTransfer)
  179. {
  180. LOG(LogError, "TCPMessageConnection::SendOutPacket: Starting an overlapped send failed!");
  181. assert(serializedMessages.size() == 0);
  182. return PacketSendSocketClosed;
  183. }
  184. writer = DataSerializer(overlappedTransfer->buffer.buf, overlappedTransfer->buffer.len);
  185. }
  186. // If this message won't fit into the buffer, send out all previously gathered messages.
  187. if (writer.BytesLeft() < totalMessageSize)
  188. break;
  189. writer.AddVLE<VLE8_16_32>(messageContentSize);
  190. writer.AddVLE<VLE8_16_32>(msg->id);
  191. if (msg->dataSize > 0)
  192. writer.AddAlignedByteArray(msg->data, msg->dataSize);
  193. ++numMessagesPacked;
  194. serializedMessages.push_back(msg);
  195. #ifdef KNET_NO_MAXHEAP
  196. assert(*outboundQueue.Front() == msg);
  197. #else
  198. assert(outboundQueue.Front() == msg);
  199. #endif
  200. outboundQueue.PopFront();
  201. }
  202. // assert(ContainerUniqueAndNoNullElements(serializedMessages)); // This precondition should always hold (but very heavy to test, uncomment to debug)
  203. if (writer.BytesFilled() == 0 && outboundQueue.Size() > 0)
  204. LOG(LogError, "Failed to send any messages to socket %s! (Probably next message was too big to fit in the buffer).", socket->ToString().c_str());
  205. overlappedTransfer->bytesContains = writer.BytesFilled();
  206. bool success = socket->EndSend(overlappedTransfer);
  207. if (!success) // If we failed to send, put all the messages back into the outbound queue to wait for the next send round.
  208. {
  209. for(size_t i = 0; i < serializedMessages.size(); ++i)
  210. #ifdef KNET_NO_MAXHEAP
  211. outboundQueue.InsertWithResize(serializedMessages[i]);
  212. #else
  213. outboundQueue.Insert(serializedMessages[i]);
  214. #endif
  215. // assert(ContainerUniqueAndNoNullElements(outboundQueue));
  216. LOG(LogError, "TCPMessageConnection::SendOutPacket() failed: Could not initiate overlapped transfer!");
  217. return PacketSendSocketFull;
  218. }
  219. LOG(LogData, "TCPMessageConnection::SendOutPacket: Sent %d bytes (%d messages) to peer %s.", (int)writer.BytesFilled(), (int)serializedMessages.size(), socket->ToString().c_str());
  220. AddOutboundStats(writer.BytesFilled(), 1, numMessagesPacked);
  221. ADDEVENT("tcpDataOut", (float)writer.BytesFilled(), "bytes");
  222. // The messages in serializedMessages array are now in the TCP driver to handle. It will guarantee
  223. // delivery if possible, so we can free the messages already.
  224. for(size_t i = 0; i < serializedMessages.size(); ++i)
  225. {
  226. #ifdef KNET_NETWORK_PROFILING
  227. std::stringstream ss;
  228. if (!serializedMessages[i]->profilerName.empty())
  229. ss << "messageOut." << serializedMessages[i]->profilerName;
  230. else
  231. ss << "messageOut." << serializedMessages[i]->id;
  232. ADDEVENT(ss.str().c_str(), (float)serializedMessages[i]->Size(), "bytes");
  233. #endif
  234. ClearOutboundMessageWithContentID(serializedMessages[i]);
  235. FreeMessage(serializedMessages[i]);
  236. }
  237. return PacketSendOK;
  238. }
  239. void TCPMessageConnection::DoUpdateConnection() // [worker thread]
  240. {
  241. ExtractMessages();
  242. }
  243. void TCPMessageConnection::SendOutPackets()
  244. {
  245. AssertInWorkerThreadContext();
  246. if (!socket || !socket->IsWriteOpen() || !socket->IsOverlappedSendReady())
  247. return;
  248. PacketSendResult result = PacketSendOK;
  249. int maxSends = 500; // Place an arbitrary limit to how many packets we will send at a time.
  250. while(result == PacketSendOK && maxSends-- > 0)
  251. result = SendOutPacket();
  252. // Thread-safely clear the eventMsgsOutAvailable event if we don't have any messages to process.
  253. if (NumOutboundMessagesPending() == 0)
  254. eventMsgsOutAvailable.Reset();
  255. if (NumOutboundMessagesPending() > 0)
  256. eventMsgsOutAvailable.Set();
  257. }
  258. void TCPMessageConnection::ExtractMessages()
  259. {
  260. AssertInWorkerThreadContext();
  261. try
  262. {
  263. size_t numMessagesReceived = 0;
  264. for(;;)
  265. {
  266. if (tcpInboundSocketData.Size() == 0) // No new packets in yet.
  267. break;
  268. if (inboundMessageQueue.CapacityLeft() == 0) // If the application can't take in any new messages, abort.
  269. break;
  270. DataDeserializer reader(tcpInboundSocketData.Begin(), tcpInboundSocketData.Size());
  271. u32 messageSize = reader.ReadVLE<VLE8_16_32>();
  272. if (messageSize == DataDeserializer::VLEReadError)
  273. break; // The packet hasn't yet been streamed in.
  274. if (messageSize == 0 || messageSize > cMaxReceivableTCPMessageSize)
  275. {
  276. LOG(LogError, "Received an invalid message size %d!", (int)messageSize);
  277. throw NetException("Malformed TCP data! Received an invalid message size!");
  278. }
  279. if (reader.BytesLeft() < messageSize)
  280. break; // We haven't yet received the whole message, have to abort parsing for now and wait for the whole message.
  281. HandleInboundMessage(0, reader.CurrentData(), messageSize);
  282. reader.SkipBytes(messageSize);
  283. assert(reader.BitPos() == 0);
  284. u32 bytesConsumed = reader.BytePos();
  285. // Erase the bytes we just processed from the ring buffer.
  286. tcpInboundSocketData.Consumed(bytesConsumed);
  287. ++numMessagesReceived;
  288. }
  289. AddInboundStats(0, 0, numMessagesReceived);
  290. } catch(const NetException &e)
  291. {
  292. LOG(LogError, "TCPMessageConnection::ExtractMessages() caught a network exception: \"%s\"!", e.what());
  293. if (socket)
  294. socket->Close();
  295. connectionState = ConnectionClosed;
  296. }
  297. }
  298. void TCPMessageConnection::PerformDisconnection()
  299. {
  300. AssertInMainThreadContext();
  301. if (socket)
  302. socket->Disconnect();
  303. }
  304. void TCPMessageConnection::DumpConnectionStatus() const
  305. {
  306. AssertInMainThreadContext();
  307. char str[2048];
  308. sprintf(str,
  309. "\ttcpInboundSocketData.Capacity(): %d\n"
  310. "\ttcpInboundSocketData.Size(): %d\n"
  311. "\ttcpInboundSocketData.ContiguousFreeBytesLeft(): %d\n",
  312. tcpInboundSocketData.Capacity(), // Note: This accesses a shared variable from the worker thread in a thread-unsafe way, and can crash. Therefore only use this function for debugging.
  313. tcpInboundSocketData.Size(),
  314. tcpInboundSocketData.ContiguousFreeBytesLeft());
  315. LOGUSER(str);
  316. }
  317. unsigned long TCPMessageConnection::TimeUntilCanSendPacket() const
  318. {
  319. // For TCPMessageConnection, this throttling logic is not used. Perhaps will not be used ever, as the
  320. // TCP driver does all send throttling already.
  321. return 0;
  322. }
  323. } // ~kNet