Przeglądaj źródła

connection established and data sending/receiving is good. TODO: fix reliable system and add flow control

mikymod 13 lat temu
rodzic
commit
4cb515658e
3 zmienionych plików z 179 dodań i 194 usunięć
  1. 99 152
      src/network/AsyncConnection.cpp
  2. 52 38
      src/network/AsyncConnection.h
  3. 28 4
      tests/connections.cpp

+ 99 - 152
src/network/AsyncConnection.cpp

@@ -11,26 +11,22 @@ AsyncConnection::AsyncConnection(Allocator& allocator) :
 	m_mode(NONE),
 	m_state(DISCONNECTED),
 	m_max_rate(MAX_RATE),
-	m_outgoing_rate_time(0),
-	m_outgoing_rate_bytes(0),
-	m_incoming_rate_time(0),
-	m_incoming_rate_bytes(0),
-	m_incoming_recv_packets(0.0f),
-	m_incoming_dropped_packets(0.0f),
-	m_incoming_packet_loss_time(0),
-	m_outgoing_sent_packet(0),
+	m_sent_packets(0),
+	m_recv_packets(0),
+	m_dropped_packets(0.0f),
 	m_local_sequence(0),
 	m_remote_sequence(0),
 	m_max_sequence(MAX_SEQUENCE),
 	m_max_rtt(MAX_RTT),	//in milliseconds
-	m_rtt(0),		
-	m_last_send_time(0),
-	m_last_data_bytes(0),
+	m_rtt(0),
+	m_sent_queue(allocator),
+	m_received_queue(allocator),
 	m_running(false),
- 	m_sent_msg(allocator),
- 	m_received_msg(allocator),
- 	m_pending_ack(allocator),
- 	m_acked(allocator)
+	m_sent_packet(allocator),
+	m_received_packet(allocator),
+	m_pending_ack(allocator),
+	m_acked(allocator),
+	m_allocator(&allocator)
 {
   
 }
@@ -115,12 +111,7 @@ void AsyncConnection::connect(const os::NetAddress& addr)
 //-----------------------------------------------------------------------------
 void AsyncConnection::reset_rate()
 {
-  	m_last_send_time = 0;
-	m_last_data_bytes = 0;
-	m_outgoing_rate_time = 0;
-	m_outgoing_rate_bytes = 0;
-	m_incoming_rate_time = 0;
-	m_incoming_rate_bytes = 0;
+
 }
 
 //-----------------------------------------------------------------------------
@@ -144,24 +135,22 @@ os::NetAddress AsyncConnection::get_remote_address() const
 //-----------------------------------------------------------------------------
 int AsyncConnection::get_outgoing_rate() const
 {
-	return m_outgoing_rate_bytes;
 }
 
 //-----------------------------------------------------------------------------
 int AsyncConnection::get_incoming_rate() const
 {
-	return m_incoming_rate_bytes;
 }
 
 //-----------------------------------------------------------------------------
 float AsyncConnection::get_incoming_packet_loss() const
 {
-	if (m_incoming_recv_packets == 0 && m_incoming_dropped_packets == 0)
+	if (m_recv_packets == 0 && m_dropped_packets == 0)
 	{
 		return 0.0f;
 	}
 	// return loss packet %
-	return m_incoming_dropped_packets * 100 / (m_incoming_recv_packets + m_incoming_dropped_packets);
+	return m_dropped_packets * 100 / (m_recv_packets + m_dropped_packets);
 }
 
 //-----------------------------------------------------------------------------
@@ -173,31 +162,59 @@ uint16_t AsyncConnection::get_local_sequence() const
 void AsyncConnection::send_message(BitMessage& msg, const uint32_t time)
 {
 	assert(m_running);
-	
-	m_socket.send(m_remote_address, msg.get_data(), msg.get_size());
-	
-// 	_packet_sent(msg.get_size());
-	
-	m_remote_sequence++;
-
+	// set header
+	msg.set_header(m_id, m_local_sequence, m_remote_sequence, _generate_ack_bits());
+	// evaluate message size	
+	size_t size = msg.get_header_size() + msg.get_size();
+	uint8_t* data = (uint8_t*)m_allocator->allocate(size);
+	// merge header with data 
+	memcpy(data, msg.get_header(), 12);
+	memcpy(data + 12, msg.get_data(), size - 12);
+	// send message
+	m_socket.send(m_remote_address, data, size);
+	// storage outgoing message
+	m_sent_queue.push_back(msg);
 }
 
 //-----------------------------------------------------------------------------
 int32_t AsyncConnection::receive_message(BitMessage& msg, const uint32_t time)
 {
 	assert(m_running);
+	
+	os::NetAddress sender(0, 0, 0, 0, 0);
 
-	msg.begin_writing();
 	size_t size = msg.get_max_size();
-	// NetAddress handler
-	os::NetAddress sender(0, 0, 0, 0, 0);
+	
+	uint8_t* data = (uint8_t*)m_allocator->allocate(msg.get_header_size() + size);
+
 	// receive message
-	int32_t bytes = m_socket.receive(sender, msg.get_data(), size);
+	int32_t received_bytes = m_socket.receive(sender, data, size);
+	if (received_bytes <= 0)
+	{
+		return 0;
+	}
+
+	// header-taking
+	uint32_t protocol_id = data[0] << 24 | data[1] << 16 | data[2] << 8 | data[3];
+	uint16_t sequence = data[4] << 8 | data[5];
+	uint16_t ack = data[6] << 8 | data[7];
+	uint32_t ack_bits = data[8] << 24 | data[9] << 16 | data[10] << 8 | data[11];
+	
+	msg.begin_writing();
+	msg.set_header(protocol_id, sequence, ack, ack_bits);
+	//data-taking
+ 	uint8_t* tmp_ptr = &data[12];
+	memcpy(msg.get_data(), tmp_ptr, msg.get_max_size());
 	msg.set_size(size);
-	// sets BitMessage in only-read
+	
+ 	// storage incoming message	
+	m_received_queue.push_back(msg);
+	
+	// sets BitMessage in read-only for processing
 	msg.begin_reading();
 	
-	if (m_mode == SERVER && !is_connected() && bytes > 0)
+	// establish connection after first packet is received
+	if (m_mode == SERVER && !is_connected())
 	{
 		os::printf("server accepts connection from client ");
 		os::printf("%i.", (uint8_t)sender.address[0]);
@@ -209,7 +226,8 @@ int32_t AsyncConnection::receive_message(BitMessage& msg, const uint32_t time)
 		m_state = CONNECTED;
 		m_remote_address = sender;
 	}
-
+	
+	// completes connection after first packet is received from server
 	if (sender == m_remote_address)
 	{
 		if (m_mode == CLIENT && m_state == CONNECTING)
@@ -226,17 +244,17 @@ int32_t AsyncConnection::receive_message(BitMessage& msg, const uint32_t time)
 //-----------------------------------------------------------------------------
 void AsyncConnection::clear_reliable_messages()
 {
-	m_sent_msg.clear();
-	m_received_msg.clear();
+	m_sent_queue.clear();
+	m_received_queue.clear();
 }
 
 //-----------------------------------------------------------------------------
 void AsyncConnection::update(real delta)
 {
 	assert(m_running);
-	
+	// improve timeout accumulator of delta time
 	m_timeout_acc += delta;
-	
+	// if timeout accumulator > generic timeout
 	if (m_timeout_acc > m_timeout)
 	{
 		if (m_state == CONNECTING)
@@ -254,27 +272,13 @@ void AsyncConnection::update(real delta)
 				m_state = CONNECT_FAIL;
 			}
 		}
-	}	
+	}
 }	
 
 //-----------------------------------------------------------------------------
 bool AsyncConnection::ready_to_send(const int time) const
 {
-	// if max rate isn't set, send message
-	if (!m_max_rate)
-	{
-		return true;
-	}
-	
-	int delta_time;
-	
-	delta_time = time - m_last_send_time;
-	if (delta_time > 1000)
-	{
-		return true;
-	}
-	// if last message wasn't sent, sent it!
-	return ((m_last_data_bytes - ((delta_time * m_max_rate) / 1000)) <= 0);
+
 }
 
 //-----------------------------------------------------------------------------
@@ -303,22 +307,22 @@ bool AsyncConnection::is_connect_fail() const
 
 //-----------------------------------------------------------------------------
 void AsyncConnection::_packet_sent(size_t size)
-{
+{/*
 	bool seq_exists_in_sent_queue = false;
 	bool seq_exists_in_pending_ack_queue = false;
   
-	BitMessage::Header tmp;
-	BitMessage::Header* h_ptr;
+	PacketData tmp;
+	PacketData* h_ptr;
 	
 	tmp.sequence = m_local_sequence;
 
-	// If local_sequence_number is already in sent_queue
-	h_ptr = std::find(m_sent_msg.begin(), m_sent_msg.end(), tmp);
-	if (h_ptr != m_sent_msg.end())
+	// If local_sequence_number exists
+	h_ptr = std::find(m_sent_packet.begin(), m_sent_packet.end(), tmp);
+	if (h_ptr != m_sent_packet.end())
 	{
 		seq_exists_in_sent_queue = true;
 	}
-	// If local_sequence_number is already in pending_ack_queue
+
 	h_ptr = std::find(m_pending_ack.begin(), m_pending_ack.end(), tmp);
 	if(h_ptr != m_pending_ack.end())
 	{
@@ -329,54 +333,54 @@ void AsyncConnection::_packet_sent(size_t size)
  	assert(!seq_exists_in_pending_ack_queue);
 	
 	// Creates Header for saving in queues
-	BitMessage::Header header;
-	header.sequence = m_local_sequence;
-	header.time = 0.0f;
-	header.size = size;
+	PacketData packet;
+	packet.sequence = m_local_sequence;
+	packet.time = 0.0f;
+	packet.size = size;
 	// Push packet data in sent_queue
-	m_sent_msg.push_back(header);
+	m_sent_packet.push_back(packet);
 	// push packet data in pending_ack_queue
-	m_pending_ack.push_back(header);
-	// Increments sent packet
-	m_outgoing_sent_packet++;
+	m_pending_ack.push_back(packet);
+	// Increments sent packets
+	m_sent_packets++;
 	// Increments local sequence
 	m_local_sequence++;
 
 	if (m_local_sequence > m_max_sequence)
 	{
 		m_local_sequence = 0;
-	}
+	}*/
 }
 
 //-----------------------------------------------------------------------------
 void AsyncConnection::_packet_received(uint16_t sequence, size_t size)
-{
-	BitMessage::Header tmp;
-	BitMessage::Header* h_ptr;
+{/*
+	PacketData tmp;
+	PacketData* h_ptr;
 
 	tmp.sequence = sequence;
 
 	// Increment received packets
-	m_incoming_recv_packets++;
+	m_recv_packets++;
 	
 	// If packet's sequence exists, return
-	h_ptr = std::find(m_received_msg.begin(), m_received_msg.end(), tmp);
-	if (h_ptr != m_received_msg.end())
+	h_ptr = std::find(m_received_packet.begin(), m_received_packet.end(), tmp);
+	if (h_ptr != m_received_packet.end())
 	{
 		return;
 	}
 	
-	BitMessage::Header header;
-	header.sequence = sequence;
-	header.time = 0.0f;
-	header.size = size;
+	PacketData packet;
+	packet.sequence = sequence;
+	packet.time = 0.0f;
+	packet.size = size;
 	// Push packet data in received_queue
-	m_received_msg.push_back(header);
+	m_received_packet.push_back(packet);
 	// update m_remote_sequence
 	if (_sequence_more_recent(sequence, m_remote_sequence))
 	{
 		m_remote_sequence = sequence;
-	}  
+	}  */
 }
 
 //-----------------------------------------------------------------------------
@@ -387,7 +391,7 @@ void AsyncConnection::_process_ack(uint16_t ack, int32_t ack_bits)
 		return;
 	}
 	
-	BitMessage::Header* i = m_pending_ack.begin();
+	PacketData* i = m_pending_ack.begin();
 	while (i != m_pending_ack.end())
 	{
 		bool acked = false;
@@ -416,6 +420,7 @@ void AsyncConnection::_process_ack(uint16_t ack, int32_t ack_bits)
 			++i;
 		}
 	}  
+	
 	m_pending_ack.clear();
 }
 
@@ -447,18 +452,18 @@ int32_t AsyncConnection::_bit_index_for_sequence(uint16_t seq, uint16_t ack)
 }
 
 //-----------------------------------------------------------------------------
-int32_t AsyncConnection::_generate_ack_bits(uint16_t ack)
+int32_t AsyncConnection::_generate_ack_bits()
 {
 	int32_t ack_bits = 0;
 	
-	for (BitMessage::Header* i = m_received_msg.begin(); i != m_received_msg.end(); i++)
+	for (PacketData* i = m_received_packet.begin(); i != m_received_packet.end(); i++)
 	{
-		if (i->sequence == ack || _sequence_more_recent(i->sequence, ack))
+		if (i->sequence == m_remote_sequence || _sequence_more_recent(i->sequence, m_remote_sequence))
 		{
 			break;
 		}
 		
-        int32_t bit_index = _bit_index_for_sequence(i->sequence, ack);
+        int32_t bit_index = _bit_index_for_sequence(i->sequence, m_remote_sequence);
 		if (bit_index <= 31)
 		{
 			ack_bits |= 1 << bit_index;
@@ -470,76 +475,18 @@ int32_t AsyncConnection::_generate_ack_bits(uint16_t ack)
 //-----------------------------------------------------------------------------
 void AsyncConnection::_update_outgoing_rate(const uint32_t time, const size_t size)
 {
-	// update the outgoing rate control variables
-	int delta_time;
-	delta_time = time - m_last_send_time;
-	if (delta_time > 1000) 
-	{
-		m_last_data_bytes = 0;
-	}
-	else 
-	{
-		m_last_data_bytes -= (delta_time * m_max_rate) / 1000;
-		if ( m_last_data_bytes < 0 ) 
-		{
-			m_last_data_bytes = 0;
-		}
-	}
-	
-	m_last_data_bytes += size;
-	m_last_send_time = time;  
-	
-	// update outgoing rate variables
-	if (time - m_outgoing_rate_time > 1000) 
-	{
-		m_outgoing_rate_bytes -= m_outgoing_rate_bytes * (time - m_outgoing_rate_time - 1000) / 1000;
-		if (m_outgoing_rate_bytes < 0) 
-		{
-			m_outgoing_rate_bytes = 0;
-		}
-	}
-	
-	m_outgoing_rate_time = time - 1000;
-	m_outgoing_rate_bytes += size;
+
 }
 
 //-----------------------------------------------------------------------------
 void AsyncConnection::_update_incoming_rate(const uint32_t time, const size_t size)
 {
-	// update incoming rate variables
-	if (time - m_incoming_rate_time > 1000) 
-	{
-		m_incoming_rate_bytes -= m_incoming_rate_bytes * (time - m_incoming_rate_time - 1000) / 1000;
-		if (m_incoming_rate_bytes < 0) 
-		{
-			m_incoming_rate_bytes = 0;
-		}
-	}
-	m_incoming_rate_time = time - 1000;
-	m_incoming_rate_bytes += size;  
+ 
 }
 
 //-----------------------------------------------------------------------------
 void AsyncConnection::_update_packet_loss(const uint32_t time, const uint32_t num_recv, const uint32_t num_dropped)
 {
-	// update incoming packet loss variables
-	if (time - m_incoming_packet_loss_time > 5000) 
-	{
-		float scale = (time - m_incoming_packet_loss_time - 5000) * (1.0f / 5000.0f);
-		m_incoming_recv_packets -= m_incoming_recv_packets * scale;
-		if (m_incoming_recv_packets < 0.0f) 
-		{
-			m_incoming_recv_packets = 0.0f;
-		}
-		m_incoming_dropped_packets -= m_incoming_dropped_packets * scale;
-		if (m_incoming_dropped_packets < 0.0f) 
-		{
-			m_incoming_dropped_packets = 0.0f;
-		}
-	}
-	m_incoming_packet_loss_time = time - 5000;
-	m_incoming_recv_packets += num_recv;
-	m_incoming_dropped_packets += num_dropped;
 }
 
 void AsyncConnection::_clear_data()

+ 52 - 38
src/network/AsyncConnection.h

@@ -6,26 +6,37 @@
 #include "BitMessage.h"
 #include "Queue.h"
 
-#define DEFAULT_PROTOCOL_ID 0xFFFFFFFF
-#define DEFAULT_TIMEOUT		10.0f
-#define MAX_RATE			64000
-#define MAX_SEQUENCE		0xFFFFFFFF
-#define MAX_RTT				1000
-#define MAX_PACKET_LEN		1400
-#define MAX_MESSAGE_SIZE	16384
-#define MAX_QUEUE_SIZE		16384
-
 namespace crown
 {
 namespace network
 {
 
-	
 /**
  * Reliable connection over UDP
  */
 class AsyncConnection
 {
+public:
+  
+	struct PacketData
+	{
+		  uint32_t 	sequence;
+		  real		time;
+		  size_t	size;
+		  
+		  PacketData()
+		  {
+			  sequence = 0;
+			  time = 0.0f;
+			  size = 0;
+		  }
+		  
+		  bool operator==(const PacketData& packet)
+		  {
+			  return sequence == packet.sequence;
+		  }
+	};
+  
 public:
 
 									AsyncConnection(Allocator& allocator);
@@ -98,11 +109,11 @@ private:
 	void 							_process_ack(uint16_t ack, int32_t ack_bits);
 	bool							_sequence_more_recent(uint16_t s1, uint16_t s2);
 	int32_t							_bit_index_for_sequence(uint16_t seq, uint16_t ack);
-	int32_t							_generate_ack_bits(uint16_t ack);
+	int32_t							_generate_ack_bits();
 									// methods which provides a flow control system
-	void							_update_outgoing_rate(const uint32_t time, const size_t size);
-	void							_update_incoming_rate(const uint32_t time, const size_t size);
-	void							_update_packet_loss(const uint32_t time, const uint32_t num_recv, const uint32_t num_dropped);
+	void							_update_outgoing_rate(const uint32_t delta, const size_t size);
+	void							_update_incoming_rate(const uint32_t delta, const size_t size);
+	void							_update_packet_loss(const uint32_t delta, const uint32_t num_recv, const uint32_t num_dropped);
 	
 	void							_clear_data();
 
@@ -111,23 +122,15 @@ private:
   
 	os::NetAddress					m_remote_address;			// address of remote host
 	os::UDPSocket					m_socket;					// socket
-	int32_t							m_id;						// our identification used instead of port number
+	uint32_t						m_id;						// our identification used instead of port number
 	Mode							m_mode;						// connection mode
 	State							m_state;					// connection state
-	int32_t							m_max_rate;					// maximum number of bytes that may go out per second
-
-									// variables to keep track of the rate
-	int32_t							m_outgoing_rate_time;		// outgoing time rate
-	int32_t							m_outgoing_rate_bytes;		// outgoing bytes rate
-	int32_t							m_incoming_rate_time;		// incoming time rate
-	int32_t							m_incoming_rate_bytes;		// incoming bytes rate
+	uint32_t						m_max_rate;					// maximum number of bytes that may go out per second
 
 									// variables to keep track of the incoming packet loss
-	real							m_incoming_recv_packets;
-	real							m_incoming_dropped_packets;
-	int32_t							m_incoming_packet_loss_time;
-
-	int32_t							m_outgoing_sent_packet;		// keep track of sent packet
+	uint32_t						m_sent_packets;		
+	uint32_t						m_recv_packets;
+	real							m_dropped_packets;
 
 									// sequencing variables
 	uint16_t						m_local_sequence;
@@ -139,19 +142,30 @@ private:
 	real							m_timeout;					// connection timeout value
 	real							m_timeout_acc;				// timeout accumulator
 	
-									// variables to control the outgoing rate
-	int32_t							m_last_send_time;			// last time data was sent out
-	int32_t							m_last_data_bytes;			// bytes left to send at last send time
-									
+									// reliable message queues
+	Queue<BitMessage>				m_sent_queue;
+	Queue<BitMessage>				m_received_queue;
+	
 									// flag variables
 	bool							m_running;
-									// reliable messages
-	Queue<BitMessage::Header>		m_sent_msg;					// Sent messages queue
-	Queue<BitMessage::Header>		m_received_msg;				// Received messages queue
-	Queue<BitMessage::Header>		m_pending_ack;				// Pending acknokledges queue
-	Queue<BitMessage::Header>		m_acked;					// Acknowledges queue
-					
-
+	
+									// packet queues
+	Queue<PacketData>				m_sent_packet;					// Sent messages queue
+	Queue<PacketData>				m_received_packet;				// Received messages queue
+	Queue<PacketData>				m_pending_ack;					// Pending acknokledges queue
+	Queue<PacketData>				m_acked;						// Acknowledges queue
+	
+	Allocator*						m_allocator;					// dynamic allocator
+									
+									// constants
+	static const uint32_t			DEFAULT_PROTOCOL_ID	= 0xFFFFFFFF;
+	static const uint32_t			DEFAULT_TIMEOUT		= 10;
+	static const uint32_t			MAX_RATE			= 64000;
+	static const uint32_t			MAX_SEQUENCE		= 0xFFFF;
+	static const uint32_t			MAX_RTT				= 1000;
+	static const uint32_t			MAX_PACKET_LEN		= 1400;
+	static const uint32_t			MAX_MESSAGE_SIZE	= 16384;
+	static const uint32_t			MAX_QUEUE_SIZE		= 16384;
 
 };
 

+ 28 - 4
tests/connections.cpp

@@ -10,7 +10,7 @@ using namespace crown;
 
 const unsigned int server_port = 30000;
 const unsigned int client_port = 30001;
-const unsigned int protocol_id = 0x99887766;
+const uint32_t protocol_id = 0x00;
 const float delta_time = 1.0f / 45.0f;
 const real send_rate = 1.0f / 45.0f;
 
@@ -102,10 +102,21 @@ int main(int argc, char** argv)
 				int32_t bytes = connection.receive_message(received, delta_time);
 				if (bytes > 0)
 				{
+					uint8_t* header = received.get_header();
+					uint32_t protocol_id = header[0] << 24 | header[1] << 16 | header[2] << 8 | header[3];
+					uint16_t sequence = header[4] << 8 | header[5];
+					uint16_t ack = header[6] << 8 | header[7];
+					uint32_t ack_bits = header[8] << 24 | header[9] << 16 | header[10] << 8 | header[11];  
 					char string[6];
 					received.read_string(string, 6);
- 					os::printf("%s\n", string);
-					
+					os::printf("------------------\n");
+					os::printf("protocol_id: %d\n", protocol_id);
+					os::printf("sequence: %d\n", sequence);
+					os::printf("ack: %d\n", ack);
+					os::printf("ack_bits: %d\n", ack_bits);
+ 					os::printf("data: %s\n", string);
+					os::printf("------------------\n");
+
 				} 
 				
 				if (bytes <= 0)
@@ -122,10 +133,23 @@ int main(int argc, char** argv)
 				network::BitMessage message(alloc);
 			
 				message.init(6);
+				message.begin_writing();
 				message.write_string("prova", 6);
 				
 				connection.send_message(message, delta_time);
-				
+
+				uint8_t* header = message.get_header();
+				uint32_t protocol_id = header[0] << 24 | header[1] << 16 | header[2] << 8 | header[3];
+				uint16_t sequence = header[4] << 8 | header[5];
+				uint16_t ack = header[6] << 8 | header[7];
+				uint32_t ack_bits = header[8] << 24 | header[9] << 16 | header[10] << 8 | header[11];
+				os::printf("------------------\n");
+				os::printf("protocol_id: %d\n", protocol_id);
+				os::printf("sequence: %d\n", sequence);
+				os::printf("ack: %d\n", ack);
+				os::printf("ack_bits: %d\n", ack_bits);
+				os::printf("------------------\n");
+
 				send_acc -= 1.0f / send_rate;
 			}
 		}