AsyncConnection.cpp 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. #include <algorithm>
  2. #include "AsyncConnection.h"
  3. namespace crown
  4. {
  5. namespace network
  6. {
  7. AsyncConnection::AsyncConnection(Allocator& allocator) :
  8. m_sent_msg(allocator),
  9. m_received_msg(allocator),
  10. m_pending_ack(allocator),
  11. m_acked(allocator)
  12. {
  13. }
  14. //-----------------------------------------------------------------------------
  15. AsyncConnection::~AsyncConnection()
  16. {
  17. }
  18. //-----------------------------------------------------------------------------
  19. void AsyncConnection::init(const os::NetAddress addr, const int id)
  20. {
  21. m_remote_address = addr;
  22. m_id = id;
  23. m_max_rate = 64000;
  24. m_outgoing_rate_time = 0;
  25. m_outgoing_rate_bytes = 0;
  26. m_incoming_rate_time = 0;
  27. m_incoming_rate_bytes = 0;
  28. m_incoming_recv_packets = 0.0f;
  29. m_incoming_dropped_packets = 0.0f;
  30. m_incoming_packet_loss_time = 0;
  31. m_outgoing_sent_packet = 0;
  32. m_remote_sequence = 0;
  33. m_local_sequence = 0;
  34. m_max_sequence = 0xFFFFFFFF;
  35. m_max_rtt = 1; //in seconds
  36. m_rtt = 0;
  37. m_last_send_time = 0;
  38. m_last_data_bytes = 0;
  39. // open port
  40. m_socket.open(addr.get_port());
  41. assert(m_socket.is_open());
  42. }
  43. //-----------------------------------------------------------------------------
  44. void AsyncConnection::reset_rate()
  45. {
  46. m_last_send_time = 0;
  47. m_last_data_bytes = 0;
  48. m_outgoing_rate_time = 0;
  49. m_outgoing_rate_bytes = 0;
  50. m_incoming_rate_time = 0;
  51. m_incoming_rate_bytes = 0;
  52. }
  53. //-----------------------------------------------------------------------------
  54. void AsyncConnection::set_max_outgoing_rate(int rate)
  55. {
  56. m_max_rate = rate;
  57. }
  58. //-----------------------------------------------------------------------------
  59. int AsyncConnection::get_max_outgoing_rate()
  60. {
  61. return m_max_rate;
  62. }
  63. //-----------------------------------------------------------------------------
  64. os::NetAddress AsyncConnection::get_remote_address() const
  65. {
  66. return m_remote_address;
  67. }
  68. //-----------------------------------------------------------------------------
  69. int AsyncConnection::get_outgoing_rate() const
  70. {
  71. return m_outgoing_rate_bytes;
  72. }
  73. //-----------------------------------------------------------------------------
  74. int AsyncConnection::get_incoming_rate() const
  75. {
  76. return m_incoming_rate_bytes;
  77. }
  78. //-----------------------------------------------------------------------------
  79. float AsyncConnection::get_incoming_packet_loss() const
  80. {
  81. if (m_incoming_recv_packets == 0 && m_incoming_dropped_packets == 0)
  82. {
  83. return 0.0f;
  84. }
  85. // return loss packet %
  86. return m_incoming_dropped_packets * 100 / (m_incoming_recv_packets + m_incoming_dropped_packets);
  87. }
  88. //-----------------------------------------------------------------------------
  89. void AsyncConnection::send_message(BitMessage& msg, const uint32_t time)
  90. {
  91. m_socket.send(m_remote_address, msg.get_data(), msg.get_size());
  92. //TODO
  93. }
  94. //-----------------------------------------------------------------------------
  95. bool AsyncConnection::receive_message(BitMessage& msg, const uint32_t time)
  96. {
  97. m_socket.receive(m_remote_address, msg.get_data(), msg.get_size());
  98. //TODO
  99. }
  100. //-----------------------------------------------------------------------------
  101. void AsyncConnection::clear_reliable_messages()
  102. {
  103. m_sent_msg.clear();
  104. m_received_msg.clear();
  105. }
  106. //-----------------------------------------------------------------------------
  107. bool AsyncConnection::ready_to_send(const int time) const
  108. {
  109. // if max rate isn't set, send message
  110. if (!m_max_rate)
  111. {
  112. return true;
  113. }
  114. int delta_time;
  115. delta_time = time - m_last_send_time;
  116. if (delta_time > 1000)
  117. {
  118. return true;
  119. }
  120. // if last message wasn't sent, sent it!
  121. return ((m_last_data_bytes - ((delta_time * m_max_rate) / 1000)) <= 0);
  122. }
  123. //-----------------------------------------------------------------------------
  124. bool AsyncConnection::process(const os::NetAddress from, int time, BitMessage &msg, int &sequence)
  125. {
  126. }
  127. //-----------------------------------------------------------------------------
  128. void AsyncConnection::_packet_sent(size_t size)
  129. {
  130. bool seq_exists_in_sent_queue = false;
  131. bool seq_exists_in_pending_ack_queue = false;
  132. BitMessage::Header tmp;
  133. BitMessage::Header* h_ptr;
  134. tmp.sequence = m_local_sequence;
  135. // If local_sequence_number is already in sent_queue
  136. h_ptr = std::find(m_sent_msg.begin(), m_sent_msg.end(), tmp);
  137. if (h_ptr != m_sent_msg.end())
  138. {
  139. seq_exists_in_sent_queue = true;
  140. }
  141. // If local_sequence_number is already in pending_ack_queue
  142. h_ptr = std::find(m_pending_ack.begin(), m_pending_ack.end(), tmp);
  143. if(h_ptr != m_pending_ack.end())
  144. {
  145. seq_exists_in_pending_ack_queue = true;
  146. }
  147. // Else
  148. assert(!seq_exists_in_sent_queue);
  149. assert(!seq_exists_in_pending_ack_queue);
  150. // Creates Header for saving in queues
  151. BitMessage::Header header;
  152. header.sequence = m_local_sequence;
  153. header.time = 0.0f;
  154. header.size = size;
  155. // Push packet data in sent_queue
  156. m_sent_msg.push_back(header);
  157. // push packet data in pending_ack_queue
  158. m_pending_ack.push_back(header);
  159. // Increments sent packet
  160. m_outgoing_sent_packet++;
  161. // Increments local sequence
  162. m_local_sequence++;
  163. if (m_local_sequence > m_max_sequence)
  164. {
  165. m_local_sequence = 0;
  166. }
  167. }
  168. //-----------------------------------------------------------------------------
  169. void AsyncConnection::_packet_received(uint32_t sequence, size_t size)
  170. {
  171. BitMessage::Header tmp;
  172. BitMessage::Header* h_ptr;
  173. tmp.sequence = sequence;
  174. // Increment received packets
  175. m_incoming_recv_packets++;
  176. // If packet's sequence exists, return
  177. h_ptr = std::find(m_received_msg.begin(), m_received_msg.end(), tmp);
  178. if (h_ptr != m_received_msg.end())
  179. {
  180. return;
  181. }
  182. BitMessage::Header header;
  183. header.sequence = sequence;
  184. header.time = 0.0f;
  185. header.size = size;
  186. // Push packet data in received_queue
  187. m_received_msg.push_back(header);
  188. // update m_remote_sequence
  189. if (_sequence_more_recent(sequence, m_remote_sequence))
  190. {
  191. m_remote_sequence = sequence;
  192. }
  193. }
  194. //-----------------------------------------------------------------------------
  195. bool AsyncConnection::_sequence_more_recent(uint32_t s1, uint32_t s2)
  196. {
  197. return ((s1 > s2) && (s1 - s2 <= m_max_sequence / 2)) || ((s2 > s1) && (s2 - s1<= m_max_sequence / 2 ));
  198. }
  199. //-----------------------------------------------------------------------------
  200. uint32_t AsyncConnection::_bit_index_for_sequence(uint32_t seq, uint32_t ack)
  201. {
  202. assert(seq != ack);
  203. assert(!_sequence_more_recent(seq, ack));
  204. if (seq > ack)
  205. {
  206. assert(ack < 33);
  207. assert(seq <= m_max_sequence);
  208. return ack + (m_max_sequence - seq);
  209. }
  210. else
  211. {
  212. assert(ack >= 1);
  213. assert(seq <= ack - 1);
  214. return ack - 1 - seq;
  215. }
  216. }
  217. //-----------------------------------------------------------------------------
  218. uint32_t AsyncConnection::_generate_ack_bits(uint32_t ack)
  219. {
  220. uint32_t ack_bits = 0;
  221. for (BitMessage::Header* i = m_received_msg.begin(); i != m_received_msg.end(); i++)
  222. {
  223. if (i->sequence == ack || _sequence_more_recent(i->sequence, ack))
  224. {
  225. break;
  226. }
  227. uint32_t bit_index = _bit_index_for_sequence(i->sequence, ack);
  228. if (bit_index <= 31)
  229. {
  230. ack_bits |= 1 << bit_index;
  231. }
  232. }
  233. return ack_bits;
  234. }
  235. //-----------------------------------------------------------------------------
  236. void AsyncConnection::_update_outgoing_rate(const uint32_t time, const size_t size)
  237. {
  238. // update the outgoing rate control variables
  239. int delta_time;
  240. delta_time = time - m_last_send_time;
  241. if (delta_time > 1000)
  242. {
  243. m_last_data_bytes = 0;
  244. }
  245. else
  246. {
  247. m_last_data_bytes -= (delta_time * m_max_rate) / 1000;
  248. if ( m_last_data_bytes < 0 )
  249. {
  250. m_last_data_bytes = 0;
  251. }
  252. }
  253. m_last_data_bytes += size;
  254. m_last_send_time = time;
  255. // update outgoing rate variables
  256. if (time - m_outgoing_rate_time > 1000)
  257. {
  258. m_outgoing_rate_bytes -= m_outgoing_rate_bytes * (time - m_outgoing_rate_time - 1000) / 1000;
  259. if (m_outgoing_rate_bytes < 0)
  260. {
  261. m_outgoing_rate_bytes = 0;
  262. }
  263. }
  264. m_outgoing_rate_time = time - 1000;
  265. m_outgoing_rate_bytes += size;
  266. }
  267. //-----------------------------------------------------------------------------
  268. void AsyncConnection::_update_incoming_rate(const uint32_t time, const size_t size)
  269. {
  270. // update incoming rate variables
  271. if (time - m_incoming_rate_time > 1000)
  272. {
  273. m_incoming_rate_bytes -= m_incoming_rate_bytes * (time - m_incoming_rate_time - 1000) / 1000;
  274. if (m_incoming_rate_bytes < 0)
  275. {
  276. m_incoming_rate_bytes = 0;
  277. }
  278. }
  279. m_incoming_rate_time = time - 1000;
  280. m_incoming_rate_bytes += size;
  281. }
  282. //-----------------------------------------------------------------------------
  283. void AsyncConnection::_update_packet_loss(const uint32_t time, const uint32_t num_recv, const uint32_t num_dropped)
  284. {
  285. // update incoming packet loss variables
  286. if (time - m_incoming_packet_loss_time > 5000)
  287. {
  288. float scale = (time - m_incoming_packet_loss_time - 5000) * (1.0f / 5000.0f);
  289. m_incoming_recv_packets -= m_incoming_recv_packets * scale;
  290. if (m_incoming_recv_packets < 0.0f)
  291. {
  292. m_incoming_recv_packets = 0.0f;
  293. }
  294. m_incoming_dropped_packets -= m_incoming_dropped_packets * scale;
  295. if (m_incoming_dropped_packets < 0.0f)
  296. {
  297. m_incoming_dropped_packets = 0.0f;
  298. }
  299. }
  300. m_incoming_packet_loss_time = time - 5000;
  301. m_incoming_recv_packets += num_recv;
  302. m_incoming_dropped_packets += num_dropped;
  303. }
  304. } // namespace network
  305. } // namespace crown