MessageConnection.h 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609
  1. /* Copyright The kNet Project.
  2. Licensed under the Apache License, Version 2.0 (the "License");
  3. you may not use this file except in compliance with the License.
  4. You may obtain a copy of the License at
  5. http://www.apache.org/licenses/LICENSE-2.0
  6. Unless required by applicable law or agreed to in writing, software
  7. distributed under the License is distributed on an "AS IS" BASIS,
  8. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  9. See the License for the specific language governing permissions and
  10. limitations under the License. */
  11. #pragma once
  12. /** @file MessageConnection.h
  13. @brief The MessageConnection and ConnectionStatistics classes. */
  14. // Modified by Lasse Oorni for Urho3D
  15. #include <vector>
  16. #include <map>
  17. #include <utility>
  18. #include <set>
  19. #include "kNetBuildConfig.h"
  20. // Urho3D: include Socket.h first to make sure WS2Include.h is included before windows.h / winsock.h
  21. #include "Socket.h"
  22. #include "WaitFreeQueue.h"
  23. #include "LockFreePoolAllocator.h"
  24. #include "Lockable.h"
  25. #include "NetworkSimulator.h"
  26. #include "IMessageHandler.h"
  27. #include "BasicSerializedDataTypes.h"
  28. #include "Datagram.h"
  29. #include "FragmentedTransferManager.h"
  30. #include "NetworkMessage.h"
  31. #include "Event.h"
  32. #include "DataSerializer.h"
  33. #include "DataDeserializer.h"
  34. #include "MaxHeap.h"
  35. #include "Clock.h"
  36. #include "PolledTimer.h"
  37. #include "Thread.h"
  38. #include "Types.h"
  39. namespace kNet
  40. {
  41. class MessageConnection;
  42. class UDPMessageConnection;
  43. class TCPMessageConnection;
  44. class NetworkServer;
  45. class Network;
  46. class NetworkWorkerThread;
  47. class FragmentedSendManager;
  48. #ifdef _MSC_VER
  49. struct FragmentedSendManager::FragmentedTransfer;
  50. #endif
  51. /// Stores information about an established MessageConnection.
  52. struct ConnectionStatistics
  53. {
  54. /// Remembers a ping request that was sent to the other end.
  55. struct PingTrack
  56. {
  57. tick_t pingSentTick; ///< Timestamp of when the PingRequest was sent.
  58. tick_t pingReplyTick; ///< If replyReceived==true, contains the timestamp of when PingReply was received as a response.
  59. unsigned long pingID; ///< ID of this ping message.
  60. bool replyReceived; ///< True of PingReply has already been received for this.
  61. };
  62. /// Contains an entry for each recently performed Ping operation, sorted by age (oldest first).
  63. std::vector<PingTrack> ping;
  64. /// Remembers both in- and outbound traffic events on the socket.
  65. struct TrafficTrack
  66. {
  67. tick_t tick; ///< Denotes when this event occurred.
  68. unsigned long packetsIn; ///< The number of datagrams in when this event occurred.
  69. unsigned long packetsOut; ///< The number of datagrams out when this event occurred.
  70. unsigned long messagesIn; ///< The total number of messages the received datagrams contained.
  71. unsigned long messagesOut; ///< The total number of messages the sent datagrams contained.
  72. unsigned long bytesIn; ///< The total number of bytes the received datagrams contained.
  73. unsigned long bytesOut; ///< The total number of bytes the sent datagrams contained.
  74. };
  75. /// Contains an entry for each recent traffic event (data in/out) on the connection, sorted by age (oldest first).
  76. std::vector<TrafficTrack> traffic;
  77. /// Remembers the send/receive time of a datagram with a certain ID.
  78. struct DatagramIDTrack
  79. {
  80. tick_t tick;
  81. packet_id_t packetID;
  82. };
  83. /// Contains an entry for each recently received packet, sorted by age (oldest first).
  84. std::vector<DatagramIDTrack> recvPacketIDs;
  85. };
  86. /// Comparison object that sorts the two messages by their priority (higher priority/smaller number first).
  87. class NetworkMessagePriorityCmp
  88. {
  89. public:
  90. int operator ()(const NetworkMessage *a, const NetworkMessage *b)
  91. {
  92. assert(a && b);
  93. if (a->priority < b->priority) return -1;
  94. if (b->priority < a->priority) return 1;
  95. if (a->MessageNumber() < b->MessageNumber()) return 1;
  96. if (b->MessageNumber() < a->MessageNumber()) return -1;
  97. return 0;
  98. }
  99. };
  100. /// Represents the current state of the connection.
  101. enum ConnectionState
  102. {
  103. ConnectionPending, ///< Waiting for the other end to send an acknowledgement packet to form the connection. No messages may yet be sent or received at this state.
  104. ConnectionOK, ///< The connection is bidirectionally open, for both reading and writing. (readOpen=true, writeOpen=true)
  105. ConnectionDisconnecting, ///< We are closing the connection down. Cannot send any more messages, but can still receive. (readOpen=true, writeOpen=false)
  106. ConnectionPeerClosed, ///< The other end has closed the connection. No new messages will be received, but can still send messages. (readOpen=false, writeOpen=true)
  107. ConnectionClosed ///< The socket is no longer open. A MessageConnection object in this state cannot be reused to open a new connection, but a new connection object must be created.
  108. };
  109. /// Returns a textual representation of a ConnectionState.
  110. std::string ConnectionStateToString(ConnectionState state);
  111. // Prevent confusion with Win32 functions
  112. #ifdef SendMessage
  113. #undef SendMessage
  114. #endif
  115. /// Represents a single established network connection. MessageConnection maintains its own worker thread that manages
  116. /// connection control, the scheduling and prioritization of outbound messages, and receiving inbound messages.
  117. class MessageConnection : public RefCountable
  118. {
  119. public:
  120. virtual ~MessageConnection();
  121. /// Returns the current connection state.
  122. ConnectionState GetConnectionState() const; // [main and worker thread]
  123. /// Returns true if the peer has signalled it will not send any more data (the connection is half-closed or full-closed).
  124. bool IsReadOpen() const; // [main and worker thread]
  125. /// Returns true if we have signalled not to send any more data (the connection is half-closed or full-closed).
  126. bool IsWriteOpen() const; // [main and worker thread]
  127. /// Returns true if the connection is in the ConnectionPending state and waiting for the other end to resolve/establish the connection.
  128. /// When this function returns false, the connection may be half-open, bidirectionally open, timed out on ConnectionPending, or closed.
  129. bool IsPending() const; // [main and worker thread]
  130. /// Returns true if this socket is connected, i.e. at least half-open in one way.
  131. bool Connected() const { return IsReadOpen() || IsWriteOpen(); } // [main and worker thread]
  132. /// Runs a modal processing loop and produces events for all inbound received data. Returns when the connection is closed.
  133. /// This is an example function mostly useful only for very simple demo applications. In most cases,
  134. /// you do not want to call this.
  135. void RunModalClient(); // [main thread]
  136. /// Blocks for the given amount of time until the connection has transitioned away from ConnectionPending state.
  137. /// @param maxMSecstoWait A positive value that indicates the maximum time to wait until returning.
  138. /// @return If the connection was successfully opened, this function returns true. Otherwise returns false, and
  139. /// either timeout was encountered and the other end has not acknowledged the connection,
  140. /// or the connection is in ConnectionClosed state.
  141. bool WaitToEstablishConnection(int maxMSecsToWait = 500); // [main thread]
  142. /// Starts a benign disconnect procedure. Transitions ConnectionState to ConnectionDisconnecting. This
  143. /// function will block until the given period expires or the other end acknowledges and also closes
  144. /// down the connection. Currently no guarantee is given for whether previous reliable messages will
  145. /// safely reach the destination. To ensure this, do a manual wait to flush the outbound message queue
  146. /// before disconnecting.
  147. /// @param maxMSecsToWait A positive number that indicates the maximum time to wait for a disconnect
  148. /// acknowledgement message until returning.
  149. /// If 0 is passed, the function will send the Disconnect message and return immediately.
  150. /// When this function returns, the connection may either be in ConnectionClosing or ConnectionClosed
  151. /// state, depending on whether the other end has already acknowledged the disconnection.
  152. /// \note You may not call this function in middle of StartNewMessage() - EndAndQueueMessage() function calls.
  153. void Disconnect(int maxMSecsToWait = 500); // [main thread]
  154. /// Starts a forceful disconnect procedure.
  155. /// @param maxMSecsToWait If a positive number, Disconnect message will be sent to the peer and if no response
  156. /// is received in the given time period, the connection is forcefully closed.
  157. /// If 0, no Disconnect message will be sent at all, but the connection is torn down
  158. /// and the function returns immediately. The other end will remain hanging and will timeout.
  159. /// When this function returns, the connection is in ConnectionClosed state.
  160. /// \note You may not call this function in middle of StartNewMessage() - EndAndQueueMessage() function calls.
  161. void Close(int maxMSecsToWait = 500); // [main thread]
  162. // There are 3 ways to send messages through a MessageConnection:
  163. // StartNewMessage/EndAndQueueMessage, SendStruct, and Send. See below.
  164. /// Start building a new message with the given ID.
  165. /// @param id The ID for the message you will be sending.
  166. /// @param numBytes The number of bytes the body of this message will be. This function will pre-allocate the
  167. /// NetworkMessage::data field to hold at least that many bytes (Capacity() can also return a larger value).
  168. /// This number only needs to be an estimate, since you can later on call NetworkMessage::Reserve()
  169. /// to reallocate the message memory. If you pass in the default value 0, no pre-allocation will be performed.
  170. /// @return The NetworkMessage object that represents the new message to be built. This message is dynamically allocated
  171. /// from an internal pool of NetworkMessage blocks. For each NetworkMessage pointer obtained, call
  172. /// EndAndQueueMessage when you have finished building the message to commit the network send and to release the memory.
  173. /// Alternatively, if after calling StartNewMessage, you decide to abort the network send, free up the NetworkMessage
  174. /// by calling this->FreeMessage().
  175. NetworkMessage *StartNewMessage(unsigned long id, size_t numBytes = 0); // [main and worker thread]
  176. /// Finishes building the message and submits it to the outbound send queue.
  177. /// @param msg The message to send. After calling this function, this pointer should be considered freed and may not be
  178. /// dereferenced or passed to any other member function of this MessageConnection. Only pass in here
  179. /// NetworkMessage pointers obtained by a call to StartNewMessage() of the same MessageConnection instance.
  180. /// @param numBytes Specify here the number of actual bytes you filled in into the msg.data field. A size of 0
  181. /// is valid, and can be used in cases the message ID itself is the whole message. Passing in the default
  182. /// value of this parameter will use the size value that was specified in the call to StartNewMessage().
  183. /// @param internalQueue If true, specifies that this message was submitted from the network worker thread and not the application
  184. /// thread. Pass in the value 'false' here in the client application, or there is a chance of a race condition.
  185. void EndAndQueueMessage(NetworkMessage *msg, size_t numBytes = (size_t)(-1), bool internalQueue = false); // [main and worker thread]
  186. /// This is a conveniency function to access the above StartNewMessage/EndAndQueueMessage pair. The performance of this
  187. /// function call is not as good, since a memcpy of the message will need to be made. For performance-critical messages,
  188. /// it is better to craft the message directly into the buffer area provided by StartNewMessage.
  189. void SendMessage(unsigned long id, bool reliable, bool inOrder, unsigned long priority, unsigned long contentID,
  190. const char *data, size_t numBytes); // [main thread]
  191. /// Sends a message using a serializable structure.
  192. template<typename SerializableData>
  193. void SendStruct(const SerializableData &data, unsigned long id, bool inOrder,
  194. bool reliable, unsigned long priority, unsigned long contentID = 0); // [main thread]
  195. /// Sends a message using a compiled message structure.
  196. template<typename SerializableMessage>
  197. void Send(const SerializableMessage &data, unsigned long contentID = 0); // [main thread]
  198. /// Stops all outbound sends until ResumeOutboundSends is called. Use if you need to guarantee that some messages be sent in the same datagram.
  199. /// Do not stop outbound sends for long periods, or the other end may time out the connection.
  200. void PauseOutboundSends(); // [main thread]
  201. /// Resumes sending of outbound messages.
  202. void ResumeOutboundSends(); // [main thread]
  203. /// Returns the number of messages that have been received from the network but haven't been handled by the application yet.
  204. size_t NumInboundMessagesPending() const { return inboundMessageQueue.Size(); } // [main and worker thread]
  205. /// Returns the total number of messages pending to be sent out.
  206. size_t NumOutboundMessagesPending() const { return outboundQueue.Size() + outboundAcceptQueue.Size(); } // [main and worker thread]
  207. /// Returns the number of outbound messages the main thread has queued for the worker thread to send out. (still unaccepted by the worker thread).
  208. size_t OutboundAcceptQueueSize() const { return outboundAcceptQueue.Size(); } // [main and worker thread]
  209. /// Returns the number of outbound messages in the worker thread outbound message queue (already accepted and pending a send by the worker thread).
  210. size_t OutboundQueueSize() const { return outboundQueue.Size(); } // [main and worker thread]
  211. /// Returns the underlying raw socket. [main and worker thread]
  212. Socket *GetSocket() { return socket; }
  213. /// Returns an object that identifies the local endpoint (IP and port) this connection is connected to.
  214. EndPoint LocalEndPoint() const; // [main and worker thread]
  215. /// Returns an object that identifies the remote endpoint (IP and port) this connection is connected to.
  216. EndPoint RemoteEndPoint() const; // [main and worker thread]
  217. /// Sets an upper limit to the data send rate for this connection.
  218. /// The default is not to have an upper limit at all.
  219. /// @param numBytesPerSec The upper limit for the number of bytes to send per second. This limit includes the message header
  220. /// bytes as well and not just the payload. Set to 0 to force no limit.
  221. /// @param numDatagramsPerSec The maximum number of datagrams (UDP packets) to send per second. Set to 0 to force no limit.
  222. /// If the connection is operating on top of TCP, this field has no effect.
  223. ///\todo Implement.
  224. void SetMaximumDataSendRate(int numBytesPerSec, int numDatagramsPerSec);
  225. /// Registers a new listener object for the events of this connection.
  226. void RegisterInboundMessageHandler(IMessageHandler *handler); // [main thread]
  227. /// Fetches all newly received messages waiting in the inbound queue, and passes each of these
  228. /// to the message handler registered using RegisterInboundMessageHandler.
  229. /// Call this function periodically to receive new data from the network if you are using the Observer pattern.
  230. /// Alternatively, use the immediate-mode ReceiveMessage function to receive messages directly one at a time.
  231. /// @param maxMessageToProcess If the inbound queue contains more than this amount of new messages,
  232. /// the processing loop will return to give processing time to other parts of the application.
  233. /// If 0 is passed, messages are processed until the queue is empty.
  234. /// \note It is important to have a non-zero limit in maxMessagesToProcess (unless you're sure what you are doing), since
  235. /// otherwise an attacker might affect the performance of the application main loop by sending messages so fast that
  236. /// the queue never has time to exhaust, thus giving an infinite loop in practice.
  237. void Process(int maxMessagesToProcess = 100); // [main thread]
  238. /// Waits for at most the given amount of time until a new message is received for processing.
  239. /// @param maxMSecsToWait If 0, the call will wait indefinitely until a message is received or the connection transitions to
  240. /// closing state.
  241. /// If a positive value is passed, at most that many milliseconds is waited for a new message to be received.
  242. void WaitForMessage(int maxMSecsToWait); // [main thread]
  243. /// Returns the next message in the inbound queue. This is an alternative API to RegisterInboundMessageHandler/Process.
  244. /// \note When using this function to receive messages, remember to call FreeMessage for each NetworkMessage returned, or you
  245. /// will have a major size memory leak, fast.
  246. /// @param maxMSecsToWait If a negative number, the call will not wait at all if there are no new messages to process, but
  247. /// returns 0 immediately.
  248. /// If 0, the call will wait indefinitely until a message is received or the connection transitions to
  249. /// closing state.
  250. /// If a positive value is passed, at most that many milliseconds is waited for a new message to be received.
  251. /// @return A newly allocated object to the received message, or 0 if the queue was empty and no messages were received during
  252. /// the wait period, or if the connection transitioned to closing state. When you are finished reading the message,
  253. /// call FreeMessage for the returned pointer.
  254. NetworkMessage *ReceiveMessage(int maxMSecsToWait = -1); // [main thread]
  255. /// Frees up a NetworkMessage struct when it is no longer needed.
  256. /// You need to call this for each message that you received from a call to ReceiveMessage.
  257. void FreeMessage(NetworkMessage *msg); // [main and worker thread]
  258. /// Returns a single-line message describing the connection state.
  259. std::string ToString() const; // [main and worker thread]
  260. /// Dumps a long multi-line status message of this connection state to stdout.
  261. void DumpStatus() const; // [main thread]
  262. // MessageConnection Statistics -related functions:
  263. /// Returns the estimated RTT of the connection, in milliseconds. RTT is the time taken to communicate a message from client->host->client.
  264. float RoundTripTime() const { return rtt; } // [main and worker thread]
  265. /// Returns the number of milliseconds since we last received data from the socket.
  266. float LastHeardTime() const { return Clock::TicksToMillisecondsF(Clock::TicksInBetween(Clock::Tick(), lastHeardTime)); } // [main and worker thread]
  267. float PacketsInPerSec() const { return packetsInPerSec; } // [main and worker thread]
  268. float PacketsOutPerSec() const { return packetsOutPerSec; } // [main and worker thread]
  269. float MsgsInPerSec() const { return msgsInPerSec; } // [main and worker thread]
  270. float MsgsOutPerSec() const { return msgsOutPerSec; } // [main and worker thread]
  271. float BytesInPerSec() const { return bytesInPerSec; } // [main and worker thread]
  272. float BytesOutPerSec() const { return bytesOutPerSec; } // [main and worker thread]
  273. /// Returns the total number of bytes (excluding IP and TCP/UDP headers) that have been received from this connection.
  274. u64 BytesInTotal() const { return bytesInTotal; } // [main and worker thread]
  275. /// Returns the total number of bytes (excluding IP and TCP/UDP headers) that have been sent from this connection.
  276. u64 BytesOutTotal() const { return bytesOutTotal; } // [main and worker thread]
  277. /// Returns the simulator object which can be used to apply network condition simulations to this connection.
  278. NetworkSimulator &NetworkSendSimulator() { return networkSendSimulator; }
  279. /// Stores all the statistics about the current connection. This data is periodically recomputed
  280. /// by the network worker thread and shared to the client through a lock.
  281. Lockable<ConnectionStatistics> statistics; // [main and worker thread]
  282. protected:
  283. friend class NetworkWorkerThread;
  284. /// The Network object inside which this MessageConnection lives.
  285. Network *owner; // [set and read only by the main thread]
  286. /// If this MessageConnection represents a client connection on the server side, this gives the owner.
  287. NetworkServer *ownerServer; // [set and read only by the main thread]
  288. /// Stores the thread that manages the background processing of this connection. The same thread can manage multiple
  289. /// connections and servers, and not just this one.
  290. NetworkWorkerThread *workerThread; // [set and read only by worker thread]
  291. #ifdef KNET_THREAD_CHECKING_ENABLED
  292. /// In debug mode, we track and enforce thread safety constraints through this ID.
  293. ThreadId workerThreadId; // [set by worker thread on thread startup, read by both main and worker thread]
  294. #endif
  295. /// Performs a check that asserts that the current thread of execution is in the network worker thread.
  296. void AssertInWorkerThreadContext() const; // [main and worker thread]
  297. /// Performs a check that asserts that the current thread of execution is not in the network worker thread (it is the main thread).
  298. void AssertInMainThreadContext() const; // [main and worker thread]
  299. /// Returns true if this MessageConnection is associated with a NetworkWorkerThread to maintain.
  300. bool IsWorkerThreadRunning() const { return workerThread != 0; } // [main and worker thread]
  301. /// A queue populated by the main thread to give out messages to the MessageConnection work thread to process.
  302. WaitFreeQueue<NetworkMessage*> outboundAcceptQueue; // [produced by main thread, consumed by worker thread]
  303. /// A queue populated by the networking thread to hold all the incoming messages until the application can process them.
  304. WaitFreeQueue<NetworkMessage*> inboundMessageQueue; // [produced by worker thread, consumed by main thread]
  305. /// A priority queue that maintains in order all the messages that are going out the pipe.
  306. ///\todo Make the choice of which of the following structures to use a runtime option.
  307. #ifndef KNET_NO_MAXHEAP // If defined, disables message priorization feature to improve client-side CPU performance. By default disabled.
  308. MaxHeap<NetworkMessage*, NetworkMessagePriorityCmp> outboundQueue; // [worker thread]
  309. #else
  310. WaitFreeQueue<NetworkMessage*> outboundQueue; // [worker thread]
  311. #endif
  312. /// Tracks all the message sends that are fragmented.
  313. Lockable<FragmentedSendManager> fragmentedSends; // [worker thread]
  314. /// Tracks all the receives of fragmented messages and helps reconstruct the original messages from fragments.
  315. FragmentedReceiveManager fragmentedReceives; // [worker thread]
  316. /// Allocations of NetworkMessage structures go through a pool to avoid dynamic new/delete calls when sending messages.
  317. /// This structure is shared between the main and worker thread through a lockfree construct.
  318. LockFreePoolAllocator<NetworkMessage> messagePool; // [main and worker thread]
  319. /// Tracks when it is time to send the next PingRequest to the peer.
  320. PolledTimer pingTimer; // [worker thread]
  321. /// Tracks when it is time to update the statistics structure.
  322. PolledTimer statsRefreshTimer; // [worker thread]
  323. /// Specifies the return value for the functions that send out network packets.
  324. enum PacketSendResult
  325. {
  326. PacketSendOK, ///< The operating system signalled the packet was successfully sent.
  327. PacketSendSocketClosed, ///< The packet could not be sent, since the socket was closed.
  328. PacketSendSocketFull, ///< The packet could not be sent, since the OS outbound buffer was full.
  329. PacketSendNoMessages, ///< A packet could not be sent, since there was nothing to send.
  330. PacketSendThrottled ///< The packet could not be sent right now, since a throttle timer is in effect.
  331. };
  332. /// Serializes several messages into a single UDP/TCP packet and sends it out to the wire.
  333. virtual PacketSendResult SendOutPacket() = 0; // [worker thread]
  334. /// Sends out as many packets at one go as is allowed by the current send rate of the connection.
  335. virtual void SendOutPackets() = 0; // [worker thread]
  336. /// Returns how many milliseconds need to be waited before this socket can try sending data the next time.
  337. virtual unsigned long TimeUntilCanSendPacket() const = 0; // [worker thread]
  338. /// Performs the internal work tick that updates this connection.
  339. void UpdateConnection(); // [worker thread]
  340. /// Overridden by a subclass of MessageConnection to do protocol-specific updates (private implementation -pattern)
  341. virtual void DoUpdateConnection() {} // [worker thread]
  342. /// Marks that the peer has closed the connection and will not send any more application-level data.
  343. void SetPeerClosed(); // [worker thread]
  344. virtual void DumpConnectionStatus() const {} // [main thread]
  345. /// Posted when the application has pushed us some messages to handle.
  346. Event NewOutboundMessagesEvent() const; // [main and worker thread]
  347. /// Specifies the result of a Socket read activity.
  348. enum SocketReadResult
  349. {
  350. SocketReadOK, ///< All data was read from the socket and it is empty for now.
  351. SocketReadError, ///< An error occurred - probably the connection is dead.
  352. SocketReadThrottled, ///< There was so much data to read that we need to pause and make room for sends as well.
  353. };
  354. /// Reads all the new bytes available in the socket.
  355. /// This data will be read into the connection's internal data queue, where it will be
  356. /// parsed to messages.
  357. /// @param bytesRead [out] This field will get the number of bytes successfully read.
  358. /// @return The return code of the operation.
  359. virtual SocketReadResult ReadSocket(size_t &bytesRead) = 0; // [worker thread]
  360. SocketReadResult ReadSocket(); // [worker thread]
  361. /// Sets the worker thread object that will handle this connection.
  362. void SetWorkerThread(NetworkWorkerThread *thread); // [main thread]
  363. NetworkWorkerThread *WorkerThread() const { return workerThread; }
  364. void HandleInboundMessage(packet_id_t packetID, const char *data, size_t numBytes); // [worker thread]
  365. /// Allocates a new NetworkMessage struct. [both worker and main thread]
  366. NetworkMessage *AllocateNewMessage();
  367. // Ping/RTT management operations:
  368. void SendPingRequestMessage(bool internalQueue); // [main or worker thread]
  369. void HandlePingRequestMessage(const char *data, size_t numBytes); // [worker thread]
  370. void HandlePingReplyMessage(const char *data, size_t numBytes); // [worker thread]
  371. // Frees all internal dynamically allocated message data.
  372. void FreeMessageData(); // [main thread]
  373. /// Checks if the connection has been silent too long and has now timed out.
  374. void DetectConnectionTimeOut(); // [worker thread]
  375. /// Refreshes RTT and other connection related statistics.
  376. void ComputeStats(); // [worker thread]
  377. /// Adds a new entry for outbound data statistics.
  378. void AddOutboundStats(unsigned long numBytes, unsigned long numPackets, unsigned long numMessages); // [worker thread]
  379. /// Adds a new entry for inbound data statistics.
  380. void AddInboundStats(unsigned long numBytes, unsigned long numPackets, unsigned long numMessages); // [worker thread]
  381. /// Pulls in all new messages from the main thread to the worker thread side and admits them to the send priority queue.
  382. void AcceptOutboundMessages(); // [worker thread]
  383. /// Starts the socket-specific disconnection procedure.
  384. virtual void PerformDisconnection() = 0;
  385. /// The object that receives notifications of all received data.
  386. IMessageHandler *inboundMessageHandler; // [main thread]
  387. /// The underlying socket on top of which this connection operates.
  388. Socket *socket; // [set by main thread before the worker thread is running. Read-only when worker thread is running. Read by main and worker thread]
  389. /// Specifies the current connection state.
  390. ConnectionState connectionState; // [main and worker thread]
  391. /// If true, all sends to the socket are on hold, until ResumeOutboundSends() is called.
  392. bool bOutboundSendsPaused; // [set by main thread, read by worker thread]
  393. friend class NetworkServer;
  394. friend class Network;
  395. /// Posted when the application has pushed us some messages to handle.
  396. Event eventMsgsOutAvailable; // [main and worker thread]
  397. float rtt; ///< The currently estimated round-trip time, in milliseconds. [main and worker thread]
  398. tick_t lastHeardTime; ///< The tick since last successful receive from the socket. [main and worker thread]
  399. float packetsInPerSec; ///< The average number of datagrams we are receiving/second. [main and worker thread]
  400. float packetsOutPerSec; ///< The average number of datagrams we are sending/second. [main and worker thread]
  401. float msgsInPerSec; ///< The average number of kNet messages we are receiving/second. [main and worker thread]
  402. float msgsOutPerSec; ///< The average number of kNet messages we are sending/second. [main and worker thread]
  403. float bytesInPerSec; ///< The average number of bytes we are receiving/second. This includes kNet headers. [main and worker thread]
  404. float bytesOutPerSec; ///< The average number of bytes we are sending/second. This includes kNet headers. [main and worker thread]
  405. u64 bytesInTotal;
  406. u64 bytesOutTotal;
  407. /// Stores the current settigns related to network conditions testing.
  408. /// By default, the simulator is disabled.
  409. NetworkSimulator networkSendSimulator;
  410. /// A running number attached to each outbound message (not present in network stream) to
  411. /// break ties when deducing which message should come before which.
  412. unsigned long outboundMessageNumberCounter; // [worker thread]
  413. /// A running number that is assigned to each outbound reliable message. This is used to
  414. /// enforce proper ordering of ordered messages.
  415. unsigned long outboundReliableMessageNumberCounter; // [worker thread]
  416. /// A (messageID, contentID) pair.
  417. typedef std::pair<u32, u32> MsgContentIDPair;
  418. typedef std::map<MsgContentIDPair, std::pair<packet_id_t, tick_t> > ContentIDReceiveTrack;
  419. /// Each (messageID, contentID) pair has a packetID "stamp" associated to them to track
  420. /// and decimate out-of-order received obsoleted messages.
  421. ContentIDReceiveTrack inboundContentIDStamps; // [worker thread]
  422. typedef std::map<MsgContentIDPair, NetworkMessage*> ContentIDSendTrack;
  423. ContentIDSendTrack outboundContentIDMessages; // [worker thread]
  424. void CheckAndSaveOutboundMessageWithContentID(NetworkMessage *msg); // [worker thread]
  425. void ClearOutboundMessageWithContentID(NetworkMessage *msg); // [worker thread]
  426. /// Checks whether the given (messageID, contentID)-pair is already out-of-date and obsoleted
  427. /// by a newer packet and should not be processed.
  428. /// @return True if the packet should be processed (there was no superceding record), and
  429. /// false if the packet is old and should be discarded.
  430. bool CheckAndSaveContentIDStamp(message_id_t messageID, u32 contentID, packet_id_t packetID); // [worker thread]
  431. void SplitAndQueueMessage(NetworkMessage *message, bool internalQueue, size_t maxFragmentSize); // [main and worker thread]
  432. static const unsigned long MsgIdPingRequest = 1;
  433. static const unsigned long MsgIdPingReply = 2;
  434. static const unsigned long MsgIdFlowControlRequest = 3;
  435. static const unsigned long MsgIdPacketAck = 4;
  436. static const unsigned long MsgIdDisconnect = 0x3FFFFFFF;
  437. static const unsigned long MsgIdDisconnectAck = 0x3FFFFFFE;
  438. /// Private ctor - MessageConnections are instantiated by Network and NetworkServer classes.
  439. explicit MessageConnection(Network *owner, NetworkServer *ownerServer, Socket *socket, ConnectionState startingState);
  440. virtual bool HandleMessage(packet_id_t /*packetID*/, message_id_t /*messageID*/, const char * /*data*/, size_t /*numBytes*/) { return false; } // [main thread]
  441. private:
  442. void operator=(const MessageConnection &); ///< Noncopyable, N/I.
  443. MessageConnection(const MessageConnection &); ///< Noncopyable, N/I.
  444. };
  445. template<typename SerializableData>
  446. void MessageConnection::SendStruct(const SerializableData &data, unsigned long id, bool inOrder,
  447. bool reliable, unsigned long priority, unsigned long contentID)
  448. {
  449. AssertInMainThreadContext();
  450. const size_t dataSize = data.Size();
  451. NetworkMessage *msg = StartNewMessage(id, dataSize);
  452. if (dataSize > 0)
  453. {
  454. DataSerializer mb(msg->data, dataSize);
  455. data.SerializeTo(mb);
  456. assert(mb.BytesFilled() == dataSize); // The SerializableData::Size() estimate must be exact!
  457. }
  458. msg->id = id;
  459. msg->contentID = contentID;
  460. msg->inOrder = inOrder;
  461. msg->priority = priority;
  462. msg->reliable = reliable;
  463. #ifdef KNET_NETWORK_PROFILING
  464. char str[512];
  465. sprintf(str, "%s (%u)", SerializableData::Name(), (unsigned int)id);
  466. msg->profilerName = str;
  467. #endif
  468. EndAndQueueMessage(msg);
  469. }
  470. template<typename SerializableMessage>
  471. void MessageConnection::Send(const SerializableMessage &data, unsigned long contentID)
  472. {
  473. SendStruct(data, SerializableMessage::messageID, data.inOrder, data.reliable, data.priority, contentID);
  474. }
  475. } // ~kNet